Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Fix runPreparationJobs exiting immediately (#142)
Browse files Browse the repository at this point in the history
runPreparationJobs exits immediately and cancels its goroutine. This
prevents the goroutine from servicing the `SingularityStore.toPack`
channel, which then causes that to block after 16 writes.

Added a unit test that fails before this PR and succeeds after.

Fixes #140
  • Loading branch information
gammazero authored Oct 4, 2023
2 parents decdc1d + 1f0ba3f commit d3eb162
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 31 deletions.
3 changes: 3 additions & 0 deletions integration/singularity/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func newOptions(o ...Option) (*options, error) {
maxPendingDealSize: "0",
maxPendingDealNumber: 0,
cleanupInterval: time.Hour,
pricePerGiBEpoch: abi.NewTokenAmount(0),
pricePerGiB: abi.NewTokenAmount(0),
pricePerDeal: abi.NewTokenAmount(0),
}
for _, apply := range o {
if err := apply(opts); err != nil {
Expand Down
62 changes: 31 additions & 31 deletions integration/singularity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

var logger = log.Logger("motion/integration/singularity")

const PutQueueSize = 16

type SingularityStore struct {
*options
local *blob.LocalStore
Expand All @@ -49,7 +51,7 @@ func NewStore(o ...Option) (*SingularityStore, error) {
options: opts,
local: blob.NewLocalStore(opts.storeDir),
sourceName: "source",
toPack: make(chan uint64, 16),
toPack: make(chan uint64, PutQueueSize),
closing: make(chan struct{}),
}, nil
}
Expand Down Expand Up @@ -189,6 +191,7 @@ func (l *SingularityStore) Start(ctx context.Context) error {
logger.Errorw("Failed to list schedules for preparation", "err", err)
return err
}

pricePerGBEpoch, _ := (new(big.Rat).SetFrac(l.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64()
pricePerGB, _ := (new(big.Rat).SetFrac(l.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64()
pricePerDeal, _ := (new(big.Rat).SetFrac(l.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64()
Expand Down Expand Up @@ -270,47 +273,43 @@ func (l *SingularityStore) Start(ctx context.Context) error {
}
}
go l.runPreparationJobs()
go l.runCleanupWorker(context.Background())
go l.runCleanupWorker()
return nil
}

func (l *SingularityStore) runPreparationJobs() {
l.closed.Add(1)
defer l.closed.Done()

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
l.closed.Done()
}()
// Create a context that gets canceled when this function exits.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
for {
select {
case <-ctx.Done():
return
case fileID := <-l.toPack:
prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{
for {
select {
case <-l.closing:
return
case fileID := <-l.toPack:
prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{
Context: ctx,
ID: int64(fileID),
})
if err != nil {
logger.Errorw("preparing to pack file", "fileID", fileID, "error", err)
}
if prepareToPackSourceRes.Payload > l.packThreshold {
// mark outstanding pack jobs as ready to go so we can make CAR files
_, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{
Context: ctx,
ID: int64(fileID),
ID: l.preparationName,
Name: l.sourceName,
})
if err != nil {
logger.Errorw("preparing to pack file", "fileID", fileID, "error", err)
}
if prepareToPackSourceRes.Payload > l.packThreshold {
// mark outstanding pack jobs as ready to go so we can make CAR files
_, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{
Context: ctx,
ID: l.preparationName,
Name: l.sourceName,
})
if err != nil {
logger.Errorw("preparing to pack source", "error", err)
}
logger.Errorw("preparing to pack source", "error", err)
}
}
}
}()
}
}

func (l *SingularityStore) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -471,14 +470,16 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc
return descriptor, nil
}

func (s *SingularityStore) runCleanupWorker(ctx context.Context) {
func (s *SingularityStore) runCleanupWorker() {
s.closed.Add(1)
defer s.closed.Done()

// Run immediately once before starting ticker
s.runCleanupJob()

ticker := time.NewTicker(s.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
Expand All @@ -496,7 +497,6 @@ func (s *SingularityStore) runCleanupJob() {
}

func (s *SingularityStore) cleanup(ctx context.Context) error {

logger.Infof("Starting local store cleanup...")

dir, err := os.ReadDir(s.local.Dir())
Expand Down
83 changes: 83 additions & 0 deletions integration/singularity/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package singularity_test

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"testing"
"time"

singularityclient "github.com/data-preservation-programs/singularity/client/swagger/http"
"github.com/filecoin-project/motion/integration/singularity"
"github.com/stretchr/testify/require"
)

func TestStorePut(t *testing.T) {
testServer := httptest.NewServer(http.HandlerFunc(testHandler))
t.Cleanup(func() {
testServer.Close()
})

cfg := singularityclient.DefaultTransportConfig()
u, _ := url.Parse(testServer.URL)
cfg.Host = u.Host
singularityAPI := singularityclient.NewHTTPClientWithConfig(nil, cfg)

tmpDir := t.TempDir()
s, err := singularity.NewStore(
singularity.WithStoreDir(tmpDir),
singularity.WithWalletKey("dummy"),
singularity.WithSingularityClient(singularityAPI),
)
require.NoError(t, err)

ctx := context.Background()
s.Start(ctx)

testFile := filepath.Join(tmpDir, "testdata.txt")
f, err := os.Create(testFile)
require.NoError(t, err)
_, err = f.WriteString("Halló heimur!")
require.NoError(t, err)
require.NoError(t, f.Close())
f, err = os.Open(testFile)
require.NoError(t, err)
t.Cleanup(func() {
f.Close()
})

done := make(chan struct{})
go func() {
defer close(done)
// Putting test file more than size put queue. If this blocks, then
// store.toPack channel is not being read.
for i := 0; i < singularity.PutQueueSize+1; i++ {
desc, err := s.Put(ctx, f)
require.NoError(t, err)
require.NotNil(t, desc)
}
}()

timer := time.NewTimer(time.Second)
select {
case <-done:
case <-timer.C:
require.FailNow(t, "Put queue is not being read, check that store.runPreparationJobs is running")
}
timer.Stop()

err = s.Shutdown(context.Background())
require.NoError(t, err)
}

func testHandler(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

if req.URL.Path == "/api/preparation/MOTION_PREPARATION/schedules" && req.Method == http.MethodGet {
http.Error(w, "", http.StatusNotFound)
return
}
}

0 comments on commit d3eb162

Please sign in to comment.