diff --git a/cmd/motion/main.go b/cmd/motion/main.go index 792328e..14a3f4d 100644 --- a/cmd/motion/main.go +++ b/cmd/motion/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "os/signal" @@ -156,6 +157,12 @@ func main() { Value: 1, EnvVars: []string{"MOTION_SINGULARITY_SCHEDULE_DEAL_NUMBER"}, }, + &cli.DurationFlag{ + Name: "experimentalSingularityCleanupInterval", + Usage: "How often to check for and delete files from the local store that have already had deals made", + Value: time.Hour, + EnvVars: []string{"MOTION_SINGULARITY_LOCAL_CLEANUP_INTERVAL"}, + }, }, Action: func(cctx *cli.Context) error { storeDir := cctx.String("storeDir") @@ -200,6 +207,7 @@ func main() { singularity.WithScheduleCron(cctx.String("experimentalSingularityScheduleCron")), singularity.WithScheduleDealNumber(cctx.Int("experimentalSingularityScheduleDealNumber")), singularity.WithVerifiedDeal(cctx.Bool("verifiedDeal")), + singularity.WithCleanupInterval(cctx.Duration("experimentalSingularityCleanupInterval")), ) if err != nil { logger.Errorw("Failed to instantiate singularity store", "err", err) @@ -210,6 +218,11 @@ func main() { logger.Errorw("Failed to start Singularity blob store", "err", err) return err } + defer func() { + if err := singularityStore.Shutdown(context.Background()); err != nil { + logger.Errorw("Failed to shut down Singularity blob store", "err", err) + } + }() store = singularityStore } else { store = blob.NewLocalStore(storeDir) diff --git a/docker-compose-local-dev.yml b/docker-compose-local-dev.yml index c6c3527..3bbbc78 100644 --- a/docker-compose-local-dev.yml +++ b/docker-compose-local-dev.yml @@ -74,6 +74,7 @@ services: - MOTION_SINGULARITY_PACK_THRESHOLD - MOTION_SINGULARITY_SCHEDULE_CRON - MOTION_SINGULARITY_SCHEDULE_DEAL_NUMBER + - MOTION_SINGULARITY_CLEANUP_INTERVAL - MOTION_WALLET_KEY - MOTION_VERIFIED_DEAL volumes: diff --git a/go.work.sum b/go.work.sum index e78f043..4bae78f 100644 --- a/go.work.sum +++ b/go.work.sum @@ -745,6 +745,7 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= +github.com/googleapis/gax-go v2.0.0+incompatible h1:j0GKcs05QVmm7yesiZq2+9cxHkNK9YM6zKx4D2qucQU= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= diff --git a/integration/singularity/options.go b/integration/singularity/options.go index 33d7be8..4f2bc2e 100644 --- a/integration/singularity/options.go +++ b/integration/singularity/options.go @@ -3,6 +3,7 @@ package singularity import ( "errors" "os" + "time" singularityclient "github.com/data-preservation-programs/singularity/client/swagger/http" "github.com/filecoin-project/go-address" @@ -39,6 +40,7 @@ type ( totalDealSize string maxPendingDealSize string maxPendingDealNumber int + cleanupInterval time.Duration } ) @@ -57,6 +59,7 @@ func newOptions(o ...Option) (*options, error) { totalDealSize: "0", maxPendingDealSize: "0", maxPendingDealNumber: 0, + cleanupInterval: time.Hour, } for _, apply := range o { if err := apply(opts); err != nil { @@ -304,3 +307,12 @@ func WithMaxPendingDealNumber(v int) Option { return nil } } + +// WithCleanupInterval sets how often to check for and remove data that has been successfully stored on Filecoin. +// Deafults to time.Hour +func WithCleanupInterval(v time.Duration) Option { + return func(o *options) error { + o.cleanupInterval = v + return nil + } +} diff --git a/integration/singularity/reader.go b/integration/singularity/reader.go index e242e78..d9aaf99 100644 --- a/integration/singularity/reader.go +++ b/integration/singularity/reader.go @@ -20,15 +20,13 @@ type SingularityReader struct { } func (r *SingularityReader) Read(p []byte) (int, error) { - logger.Infof("buffer size: %v", len(p)) - - buf := bytes.NewBuffer(p) - buf.Reset() - if r.offset >= r.size { return 0, io.EOF } + buf := bytes.NewBuffer(p) + buf.Reset() + // Figure out how many bytes to read readLen := int64(len(p)) remainingBytes := r.size - r.offset diff --git a/integration/singularity/store.go b/integration/singularity/store.go index 8cf04b9..e5f5824 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -9,6 +9,8 @@ import ( "path" "strconv" "strings" + "sync" + "time" "github.com/data-preservation-programs/singularity/client/swagger/http/deal_schedule" "github.com/data-preservation-programs/singularity/client/swagger/http/file" @@ -34,7 +36,7 @@ type SingularityStore struct { sourceName string toPack chan uint64 closing chan struct{} - closed chan struct{} + closed sync.WaitGroup } func NewStore(o ...Option) (*SingularityStore, error) { @@ -49,7 +51,6 @@ func NewStore(o ...Option) (*SingularityStore, error) { sourceName: "source", toPack: make(chan uint64, 16), closing: make(chan struct{}), - closed: make(chan struct{}), }, nil } @@ -194,6 +195,7 @@ func (l *SingularityStore) Start(ctx context.Context) error { logger.Infof("Checking %v storage providers", len(l.storageProviders)) for _, sp := range l.storageProviders { + logger.Infof("Checking storage provider %s", sp) var foundSchedule *models.ModelSchedule logger := logger.With("provider", sp) for _, schd := range listPreparationSchedulesRes.Payload { @@ -268,18 +270,21 @@ func (l *SingularityStore) Start(ctx context.Context) error { } } go l.runPreparationJobs() + go l.runCleanupWorker(context.Background()) return nil } func (l *SingularityStore) runPreparationJobs() { + l.closed.Add(1) + ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + l.closed.Done() + }() go func() { - defer func() { - close(l.closed) - }() for { select { case <-ctx.Done(): @@ -306,15 +311,24 @@ func (l *SingularityStore) runPreparationJobs() { } } }() - <-l.closing } func (l *SingularityStore) Shutdown(ctx context.Context) error { close(l.closing) + + done := make(chan struct{}) + go func() { + l.closed.Wait() + close(done) + }() + select { case <-ctx.Done(): - case <-l.closed: + case <-done: } + + logger.Infof("Singularity store shut down") + return nil } @@ -396,7 +410,6 @@ func (s *SingularityStore) Get(ctx context.Context, id blob.ID) (io.ReadSeekClos return &SingularityReader{ client: s.singularityClient, fileID: fileID, - offset: 0, size: getFileRes.Payload.Size, }, nil } @@ -457,3 +470,105 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc } return descriptor, nil } + +func (s *SingularityStore) runCleanupWorker(ctx context.Context) { + s.closed.Add(1) + defer s.closed.Done() + + // Run immediately once before starting ticker + s.runCleanupJob() + + ticker := time.NewTicker(s.cleanupInterval) + for { + select { + case <-ticker.C: + s.runCleanupJob() + case <-s.closing: + return + } + } +} + +func (s *SingularityStore) runCleanupJob() { + if err := s.cleanup(context.Background()); err != nil { + logger.Errorf("Local store cleanup failed: %w", err) + } +} + +func (s *SingularityStore) cleanup(ctx context.Context) error { + + logger.Infof("Starting local store cleanup...") + + dir, err := os.ReadDir(s.local.Dir()) + if err != nil { + return fmt.Errorf("failed to open local store directory: %w", err) + } + + var binsToDelete []string + +binIteration: + for _, entry := range dir { + binFileName := entry.Name() + + id, entryIsBin := strings.CutSuffix(binFileName, ".bin") + if !entryIsBin { + continue + } + + idFileName := id + ".id" + idStream, err := os.Open(path.Join(s.local.Dir(), idFileName)) + if err != nil { + logger.Warnf("Failed to open ID map file for %s: %v", id, err) + continue + } + fileIDString, err := io.ReadAll(idStream) + if err != nil { + logger.Warnf("Failed to read ID map file for %s: %v", id, err) + continue + } + fileID, err := strconv.ParseUint(string(fileIDString), 10, 64) + if err != nil { + logger.Warnf("Failed to parse file ID %s as integer: %v", fileIDString, err) + continue + } + + getFileDealsRes, err := s.singularityClient.File.GetFileDeals(&file.GetFileDealsParams{ + Context: ctx, + ID: int64(fileID), + }) + if err != nil { + logger.Warnf("Failed to get file deals for %v: %v", fileID, err) + continue + } + + // Make sure the file has at least 1 deal for every SP + for _, sp := range s.storageProviders { + var foundDealForSP bool + for _, deal := range getFileDealsRes.Payload { + if deal.Provider == sp.String() { + foundDealForSP = true + break + } + } + + if !foundDealForSP { + // If no deal was found for this file and SP, the local bin file + // cannot be deleted yet - continue to the next one + continue binIteration + } + } + + // If deals have been made for all SPs, the local bin file can be + // deleted + binsToDelete = append(binsToDelete, binFileName) + } + + for _, binFileName := range binsToDelete { + if err := os.Remove(path.Join(s.local.Dir(), binFileName)); err != nil { + logger.Warnf("Failed to delete local bin file %s that was staged for removal: %v", binFileName, err) + } + } + logger.Infof("Cleaned up %v unneeded local files", len(binsToDelete)) + + return nil +} diff --git a/integration/test/integration_test.go b/integration/test/integration_test.go index a1c5901..81cdec7 100644 --- a/integration/test/integration_test.go +++ b/integration/test/integration_test.go @@ -2,6 +2,7 @@ package test import ( "bytes" + "crypto/rand" "encoding/json" "io" "net/http" @@ -19,12 +20,9 @@ import ( func TestRoundTripPutAndGet(t *testing.T) { env := NewEnvironment(t) - buf := new(bytes.Buffer) - for i := 0; i < 10000000; i++ { - _, err := buf.Write([]byte("1234567890")) - require.NoError(t, err) - } - wantBlob := buf.Bytes() + wantBlob, err := io.ReadAll(io.LimitReader(rand.Reader, 10<<20)) + require.NoError(t, err) + buf := bytes.NewBuffer(wantBlob) var postBlobResp api.PostBlobResponse {