Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Fail upload if it uses more then available space
Browse files Browse the repository at this point in the history
Before upload, check that more then the disk has more than the minimum free space. Upload up to the size limit that would exceed the available space.

Fixes #179
  • Loading branch information
gammazero committed Nov 8, 2023
1 parent c504ff1 commit ad2be05
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 18 deletions.
5 changes: 3 additions & 2 deletions blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
)

var (
ErrBlobTooLarge = errors.New("blob size exceeds the maximum allowed")
ErrBlobNotFound = errors.New("no blob is found with given ID")
ErrBlobNotFound = errors.New("no blob is found with given ID")
ErrBlobTooLarge = errors.New("blob size exceeds the maximum allowed")
ErrNotEnoughSpace = errors.New("insufficient local storage space remaining")
)

var (
Expand Down
60 changes: 50 additions & 10 deletions blob/local_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,42 @@ package blob
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/gammazero/fsutil/disk"
)

var _ Store = (*LocalStore)(nil)

const (
Kib = 1 << (10 * (iota + 1))
Mib
Gib
)

const defaultMinFreeSpace = 64 * Mib

// LocalStore is a Store that stores blobs as flat files in a configured directory.
// Blobs are stored as flat files, named by their ID with .bin extension.
// This store is used primarily for testing purposes.
type LocalStore struct {
dir string
dir string
minFreeSpace uint64
}

// NewLocalStore instantiates a new LocalStore and uses the given dir as the place to store blobs.
// Blobs are stored as flat files, named by their ID with .bin extension.
func NewLocalStore(dir string) *LocalStore {
func NewLocalStore(dir string, minFreeSpace uint64) *LocalStore {
if minFreeSpace == 0 {
minFreeSpace = defaultMinFreeSpace
}
logger.Debugw("Instantiated local store", "dir", dir)
return &LocalStore{
dir: dir,
dir: dir,
minFreeSpace: minFreeSpace,
}
}

Expand All @@ -32,25 +48,49 @@ func (l *LocalStore) Dir() string {
}

// Put reads the given reader fully and stores its content in the store directory as flat files.
// The reader content is first stored in a temporary directory and upon successful storage is moved to the store directory.
// The Descriptor.ModificationTime is set to the modification date of the file that corresponds to the content.
// The Descriptor.ID is randomly generated; see NewID.
//
// The reader content is first stored in a temporary directory and upon
// successful storage is moved to the store directory. The
// Descriptor.ModificationTime is set to the modification date of the file that
// corresponds to the content. The Descriptor.ID is randomly generated; see
// NewID.
//
// Before a blob is written, the minimum amount of free space must be available
// on the local disk. If writing the blob consumes more then the available
// space (free space - minimum free), then this results in an error.
func (l *LocalStore) Put(_ context.Context, reader io.ReadCloser) (*Descriptor, error) {
// TODO: add size limiter here and return ErrBlobTooLarge.
id, err := NewID()
usage, err := disk.Usage(l.dir)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot get disk usage: %w", err)
}
if usage.Free <= l.minFreeSpace {
return nil, ErrNotEnoughSpace
}

dest, err := os.CreateTemp(l.dir, "motion_local_store_*.bin.temp")
if err != nil {
return nil, err
}
defer dest.Close()
written, err := io.Copy(dest, reader)

// Do not write more than the remaining storage - minimum free space.
limit := int64(usage.Free - l.minFreeSpace)
limitReader := io.LimitReader(reader, limit)

written, err := io.Copy(dest, limitReader)
if err != nil {
os.Remove(dest.Name())
return nil, err
}
if written == limit {
os.Remove(dest.Name())
return nil, ErrBlobTooLarge
}

id, err := NewID()
if err != nil {
return nil, err
}
if err = os.Rename(dest.Name(), filepath.Join(l.dir, id.String()+".bin")); err != nil {
return nil, err
}
Expand Down
51 changes: 51 additions & 0 deletions blob/local_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package blob_test

import (
"bytes"
"context"
"io"
"testing"

"github.com/filecoin-project/motion/blob"
"github.com/gammazero/fsutil/disk"
"github.com/stretchr/testify/require"
)

func TestWriteOK(t *testing.T) {
tmpDir := t.TempDir()

store := blob.NewLocalStore(tmpDir, 0)
buf := []byte("This is a test")
readCloser := io.NopCloser(bytes.NewReader(buf))

desc, err := store.Put(context.Background(), readCloser)
require.NoError(t, err)
require.NotNil(t, desc)
require.Equal(t, uint64(len(buf)), desc.Size)
}

func TestInsufficientSpace(t *testing.T) {
tmpDir := t.TempDir()
usage, err := disk.Usage(tmpDir)
require.NoError(t, err)

store := blob.NewLocalStore(tmpDir, usage.Free+blob.Gib)
readCloser := io.NopCloser(bytes.NewReader([]byte("This is a test")))

_, err = store.Put(context.Background(), readCloser)
require.ErrorIs(t, err, blob.ErrNotEnoughSpace)
}

func TestWriteTooLarge(t *testing.T) {
tmpDir := t.TempDir()
usage, err := disk.Usage(tmpDir)
require.NoError(t, err)

store := blob.NewLocalStore(tmpDir, usage.Free-5*blob.Kib)

buf := make([]byte, 32*blob.Kib)
readCloser := io.NopCloser(bytes.NewReader(buf))

_, err = store.Put(context.Background(), readCloser)
require.ErrorIs(t, err, blob.ErrBlobTooLarge)
}
9 changes: 8 additions & 1 deletion cmd/motion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func main() {
Value: os.TempDir(),
EnvVars: []string{"MOTION_STORE_DIR"},
},
&cli.Uint64Flag{
Name: "minFreeDiskSpace",
Usage: "Minumum amount of free space that must remaio on disk after writing blob",
DefaultText: "64 Mib",
EnvVars: []string{"MIN_FREE_DISK_SPACE"},
},
&cli.StringFlag{
Name: "walletKey",
Usage: "Hex encoded private key for the wallet to use with motion",
Expand Down Expand Up @@ -205,6 +211,7 @@ func main() {
singularity.WithScheduleDealNumber(cctx.Int("experimentalSingularityScheduleDealNumber")),
singularity.WithVerifiedDeal(cctx.Bool("verifiedDeal")),
singularity.WithCleanupInterval(cctx.Duration("experimentalSingularityCleanupInterval")),
singularity.WithMinFreeSpace(cctx.Uint64("minFreeDiskSpace")),
)
if err != nil {
logger.Errorw("Failed to instantiate singularity store", "err", err)
Expand All @@ -222,7 +229,7 @@ func main() {
}()
store = singularityStore
} else {
store = blob.NewLocalStore(storeDir)
store = blob.NewLocalStore(storeDir, cctx.Uint64("minFreeDiskSpace"))
logger.Infow("Using local blob store", "storeDir", storeDir)
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/data-preservation-programs/singularity v0.5.8
github.com/filecoin-project/go-address v1.1.0
github.com/filecoin-project/go-state-types v0.12.0
github.com/gammazero/fsutil v0.0.1
github.com/google/uuid v1.3.1
github.com/gotidy/ptr v1.4.0
github.com/ipfs/go-log/v2 v2.5.1
Expand Down Expand Up @@ -160,7 +161,7 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gammazero/fsutil v0.0.1 h1:ELBzTchuwz+f6Xh5a3PS0J1ssuvMRP+bU4/h+eTBGaQ=
github.com/gammazero/fsutil v0.0.1/go.mod h1:RUdM7Ubfblvprq9CJmUtDFa1HA2Qp1kxsWIKwTVvFvw=
github.com/getsentry/sentry-go v0.18.0 h1:MtBW5H9QgdcJabtZcuJG80BMOwaBpkRDZkxRkNC1sN0=
github.com/getsentry/sentry-go v0.18.0/go.mod h1:Kgon4Mby+FJ7ZWHFUAZgVaIa8sxHtnRJRLTXZr51aKQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down Expand Up @@ -791,8 +793,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
10 changes: 10 additions & 0 deletions integration/singularity/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
maxPendingDealSize string
maxPendingDealNumber int
cleanupInterval time.Duration
minFreeSpace uint64
}
)

Expand Down Expand Up @@ -331,3 +332,12 @@ func WithCleanupInterval(v time.Duration) Option {
return nil
}
}

// WithMinFreeSpce configures the minimul free disk space that must remain
// after storing a blob. A value of zero uses the default value.
func WithMinFreeSpace(space uint64) Option {
return func(o *options) error {
o.minFreeSpace = space
return nil
}
}
2 changes: 1 addition & 1 deletion integration/singularity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewStore(o ...Option) (*Store, error) {

return &Store{
options: opts,
local: blob.NewLocalStore(opts.storeDir),
local: blob.NewLocalStore(opts.storeDir, opts.minFreeSpace),
sourceName: "source",
toPack: make(chan uint64, 1),
closing: make(chan struct{}),
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newOptions(o ...Option) (*options, error) {
if opts.blobStore == nil {
dir := os.TempDir()
logger.Warnw("No blob store is specified. Falling back on local blob store in temporary directory.", "dir", dir)
opts.blobStore = blob.NewLocalStore(dir)
opts.blobStore = blob.NewLocalStore(dir, 0)
}
return opts, nil
}
Expand Down

0 comments on commit ad2be05

Please sign in to comment.