Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor authored Aug 18, 2023
2 parents a7845a2 + 9cbb910 commit c031ffc
Show file tree
Hide file tree
Showing 4 changed files with 788 additions and 59 deletions.
163 changes: 163 additions & 0 deletions blob/singularity_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package blob

import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"

"github.com/data-preservation-programs/singularity/client"
"github.com/data-preservation-programs/singularity/handler/dataset"
"github.com/data-preservation-programs/singularity/handler/datasource"
)

const motionDatasetName = "MOTION_DATASET"
const maxCarSize = "31.5GiB"

type SingularityStore struct {
local *LocalStore
sourceID uint32
singularityClient client.Client
}

func NewSingularityStore(dir string, singularityClient client.Client) *SingularityStore {
local := NewLocalStore(dir)
return &SingularityStore{
local: local,
singularityClient: singularityClient,
}
}

func (l *SingularityStore) Start(ctx context.Context) error {
_, err := l.singularityClient.CreateDataset(ctx, dataset.CreateRequest{
Name: motionDatasetName,
MaxSizeStr: maxCarSize,
})
var asDuplicatedRecord client.DuplicateRecordError

// return errors, but ignore duplicated record, that means we just already created it
if err != nil && !errors.As(err, &asDuplicatedRecord) {
return err
}
source, err := l.singularityClient.CreateLocalSource(ctx, motionDatasetName, datasource.LocalRequest{
SourcePath: l.local.dir,
RescanInterval: "0",
DeleteAfterExport: false,
})
// handle source already created
if errors.As(err, &asDuplicatedRecord) {
sources, err := l.singularityClient.ListSourcesByDataset(ctx, motionDatasetName)
if err != nil {
return err
}
for _, source := range sources {
if source.Path == strings.TrimSuffix(l.local.dir, "/") {
l.sourceID = source.ID
return nil
}
}
// this shouldn't happen - if we have a duplicate, the record should exist
return errors.New("unable to locate dataset")
}
// return errors, but ignore duplicated record, that means we just already created it
if err != nil {
return err
}
l.sourceID = source.ID
return nil
}

func (l *SingularityStore) Shutdown(_ context.Context) error {
return nil
}

func (s *SingularityStore) Put(ctx context.Context, reader io.ReadCloser) (*Descriptor, error) {
desc, err := s.local.Put(ctx, reader)
if err != nil {
return nil, err
}
model, err := s.singularityClient.PushItem(ctx, s.sourceID, datasource.ItemInfo{Path: desc.ID.String() + ".bin"})
if err != nil {
return nil, fmt.Errorf("error creating singularity entry: %w", err)
}
idFile, err := os.CreateTemp(s.local.dir, "motion_local_store_*.bin.temp")
if err != nil {
return nil, err
}
defer idFile.Close()
_, err = idFile.Write([]byte(strconv.FormatUint(model.ID, 10)))
if err != nil {
os.Remove(idFile.Name())
return nil, err
}
if err = os.Rename(idFile.Name(), path.Join(s.local.dir, desc.ID.String()+".id")); err != nil {
return nil, err
}
return desc, nil
}

func (s *SingularityStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, error) {
// this is largely artificial -- we're verifying the singularity item, but just reading from
// the local store
idStream, err := os.Open(path.Join(s.local.dir, id.String()+".id"))
if err != nil {
return nil, err
}
itemIDString, err := io.ReadAll(idStream)
if err != nil {
return nil, err
}
itemID, err := strconv.ParseUint(string(itemIDString), 10, 64)
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
}
if err != nil {
return nil, fmt.Errorf("error loading singularity entry: %w", err)
}
var decoded ID
err = decoded.Decode(strings.TrimSuffix(path.Base(item.Path), path.Ext(item.Path)))
if err != nil {
return nil, err
}
return s.local.Get(ctx, decoded)
}

func (s *SingularityStore) Describe(ctx context.Context, id ID) (*Descriptor, error) {
// this is largely artificial -- we're verifying the singularity item, but just reading from
// the local store
idStream, err := os.Open(path.Join(s.local.dir, id.String()+".id"))
if err != nil {
return nil, err
}
itemIDString, err := io.ReadAll(idStream)
if err != nil {
return nil, err
}
itemID, err := strconv.ParseUint(string(itemIDString), 10, 64)
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
}
if err != nil {
return nil, fmt.Errorf("error loading singularity entry: %w", err)
}
var decoded ID
err = decoded.Decode(strings.TrimSuffix(path.Base(item.Path), path.Ext(item.Path)))
if err != nil {
return nil, err
}
return s.local.Describe(ctx, decoded)
}
39 changes: 39 additions & 0 deletions cmd/motion/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package main

import (
"net/http"
"os"
"os/signal"

"github.com/data-preservation-programs/singularity/client"
httpclient "github.com/data-preservation-programs/singularity/client/http"
libclient "github.com/data-preservation-programs/singularity/client/lib"
"github.com/data-preservation-programs/singularity/database"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/motion"
"github.com/filecoin-project/motion/blob"
Expand Down Expand Up @@ -45,6 +50,16 @@ func main() {
Usage: "Whether to generate the local wallet key if none is found",
Value: true,
},
&cli.BoolFlag{
Name: "experimentalSingularityStore",
Usage: "whether to use experimental Singularity store as the storage and deal making engine",
DefaultText: "Local storage is used",
},
&cli.StringFlag{
Name: "experimentalRemoteSingularityAPIUrl",
Usage: "when using a singularity as the storage engine, if set, uses a remote HTTP API to interface with Singularity",
DefaultText: "use singularity as a code library",
},
},
Action: func(cctx *cli.Context) error {

Expand Down Expand Up @@ -76,6 +91,30 @@ func main() {
return err
}
store = rbstore
} else if cctx.Bool("experimentalSingularityStore") {
singularityAPIUrl := cctx.String("experimentalRemoteSingularityAPIUrl")
var client client.Client
if singularityAPIUrl != "" {
client = httpclient.NewHTTPClient(http.DefaultClient, singularityAPIUrl)
} else {
db, err := database.OpenWithDefaults("sqlite:" + storeDir + "/singularity.db")
if err != nil {
logger.Errorw("Failed to open singularity database", "err", err)
return err
}
client, err = libclient.NewClient(db)
if err != nil {
logger.Errorw("Failed to get singularity client", "err", err)
return err
}
}
singularityStore := blob.NewSingularityStore(storeDir, client)
logger.Infow("Using Singularity blob store", "storeDir", storeDir)
if err := singularityStore.Start(cctx.Context); err != nil {
logger.Errorw("Failed to start Singularity blob store", "err", err)
return err
}
store = singularityStore
} else {
store = blob.NewLocalStore(storeDir)
logger.Infow("Using local blob store", "storeDir", storeDir)
Expand Down
Loading

0 comments on commit c031ffc

Please sign in to comment.