diff --git a/integration/singularity/options.go b/integration/singularity/options.go index 4f2bc2e..5536713 100644 --- a/integration/singularity/options.go +++ b/integration/singularity/options.go @@ -60,6 +60,9 @@ func newOptions(o ...Option) (*options, error) { maxPendingDealSize: "0", maxPendingDealNumber: 0, cleanupInterval: time.Hour, + pricePerGiBEpoch: abi.NewTokenAmount(0), + pricePerGiB: abi.NewTokenAmount(0), + pricePerDeal: abi.NewTokenAmount(0), } for _, apply := range o { if err := apply(opts); err != nil { diff --git a/integration/singularity/store.go b/integration/singularity/store.go index e5f5824..cd4add7 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -30,6 +30,8 @@ import ( var logger = log.Logger("motion/integration/singularity") +const PutQueueSize = 16 + type SingularityStore struct { *options local *blob.LocalStore @@ -49,7 +51,7 @@ func NewStore(o ...Option) (*SingularityStore, error) { options: opts, local: blob.NewLocalStore(opts.storeDir), sourceName: "source", - toPack: make(chan uint64, 16), + toPack: make(chan uint64, PutQueueSize), closing: make(chan struct{}), }, nil } @@ -189,6 +191,7 @@ func (l *SingularityStore) Start(ctx context.Context) error { logger.Errorw("Failed to list schedules for preparation", "err", err) return err } + pricePerGBEpoch, _ := (new(big.Rat).SetFrac(l.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64() pricePerGB, _ := (new(big.Rat).SetFrac(l.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64() pricePerDeal, _ := (new(big.Rat).SetFrac(l.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64() @@ -270,47 +273,43 @@ func (l *SingularityStore) Start(ctx context.Context) error { } } go l.runPreparationJobs() - go l.runCleanupWorker(context.Background()) + go l.runCleanupWorker() return nil } func (l *SingularityStore) runPreparationJobs() { l.closed.Add(1) + defer l.closed.Done() - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer func() { - cancel() - l.closed.Done() - }() + // Create a context that gets canceled when this function exits. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - go func() { - for { - select { - case <-ctx.Done(): - return - case fileID := <-l.toPack: - prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{ + for { + select { + case <-l.closing: + return + case fileID := <-l.toPack: + prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{ + Context: ctx, + ID: int64(fileID), + }) + if err != nil { + logger.Errorw("preparing to pack file", "fileID", fileID, "error", err) + } + if prepareToPackSourceRes.Payload > l.packThreshold { + // mark outstanding pack jobs as ready to go so we can make CAR files + _, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{ Context: ctx, - ID: int64(fileID), + ID: l.preparationName, + Name: l.sourceName, }) if err != nil { - logger.Errorw("preparing to pack file", "fileID", fileID, "error", err) - } - if prepareToPackSourceRes.Payload > l.packThreshold { - // mark outstanding pack jobs as ready to go so we can make CAR files - _, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{ - Context: ctx, - ID: l.preparationName, - Name: l.sourceName, - }) - if err != nil { - logger.Errorw("preparing to pack source", "error", err) - } + logger.Errorw("preparing to pack source", "error", err) } } } - }() + } } func (l *SingularityStore) Shutdown(ctx context.Context) error { @@ -471,7 +470,7 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc return descriptor, nil } -func (s *SingularityStore) runCleanupWorker(ctx context.Context) { +func (s *SingularityStore) runCleanupWorker() { s.closed.Add(1) defer s.closed.Done() @@ -479,6 +478,8 @@ func (s *SingularityStore) runCleanupWorker(ctx context.Context) { s.runCleanupJob() ticker := time.NewTicker(s.cleanupInterval) + defer ticker.Stop() + for { select { case <-ticker.C: @@ -496,7 +497,6 @@ func (s *SingularityStore) runCleanupJob() { } func (s *SingularityStore) cleanup(ctx context.Context) error { - logger.Infof("Starting local store cleanup...") dir, err := os.ReadDir(s.local.Dir()) diff --git a/integration/singularity/store_test.go b/integration/singularity/store_test.go new file mode 100644 index 0000000..c69ab63 --- /dev/null +++ b/integration/singularity/store_test.go @@ -0,0 +1,83 @@ +package singularity_test + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + singularityclient "github.com/data-preservation-programs/singularity/client/swagger/http" + "github.com/filecoin-project/motion/integration/singularity" + "github.com/stretchr/testify/require" +) + +func TestStorePut(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(testHandler)) + t.Cleanup(func() { + testServer.Close() + }) + + cfg := singularityclient.DefaultTransportConfig() + u, _ := url.Parse(testServer.URL) + cfg.Host = u.Host + singularityAPI := singularityclient.NewHTTPClientWithConfig(nil, cfg) + + tmpDir := t.TempDir() + s, err := singularity.NewStore( + singularity.WithStoreDir(tmpDir), + singularity.WithWalletKey("dummy"), + singularity.WithSingularityClient(singularityAPI), + ) + require.NoError(t, err) + + ctx := context.Background() + s.Start(ctx) + + testFile := filepath.Join(tmpDir, "testdata.txt") + f, err := os.Create(testFile) + require.NoError(t, err) + _, err = f.WriteString("Halló heimur!") + require.NoError(t, err) + require.NoError(t, f.Close()) + f, err = os.Open(testFile) + require.NoError(t, err) + t.Cleanup(func() { + f.Close() + }) + + done := make(chan struct{}) + go func() { + defer close(done) + // Putting test file more than size put queue. If this blocks, then + // store.toPack channel is not being read. + for i := 0; i < singularity.PutQueueSize+1; i++ { + desc, err := s.Put(ctx, f) + require.NoError(t, err) + require.NotNil(t, desc) + } + }() + + timer := time.NewTimer(time.Second) + select { + case <-done: + case <-timer.C: + require.FailNow(t, "Put queue is not being read, check that store.runPreparationJobs is running") + } + timer.Stop() + + err = s.Shutdown(context.Background()) + require.NoError(t, err) +} + +func testHandler(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + if req.URL.Path == "/api/preparation/MOTION_PREPARATION/schedules" && req.Method == http.MethodGet { + http.Error(w, "", http.StatusNotFound) + return + } +}