Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Experimental Singularity Store (#29)
Browse files Browse the repository at this point in the history
Implement an experimental option to use Singularity as Motion
blob.Store.

A singularity blob store for now is a local store + a singularity
connector

For a put operation, the file is saved to the local blob store, then the
local file is added to Singularity for scanning (as an item). The
returned ID is the ID generated by singularity, rather than the UUID of
the file (note: this changes the ID type to a raw string -- currently
singularity uses integer indexes for each item -- @xinaxu maybe has an
opinion about whether Singularity might either convert IDs to UUIDs or
add a UUID at least for Item -- alternatively, we can make a single
motion ID -> singularity primary key mapping in a file held in memory).

For a get operation, the id is sent to Singularity to retrieve metadata
about the file, and then the file is retrieved from the local store.
(for real retrieval with Filecoin, we could have the GetItem call to
Singularity also hydrate the local store, though this may not be the
most performant option)

Note that all this really does so far is start creating entities within
Singularities database and scan files into Items & ItemParts.

I haven't attempted to start data prep workers or make filecoin deals.

Also, currently the singularity connector is implemented with a direct
connection to an SQLite DB setup for Singularity, but could also be
implemented over the singularity API.

Per the thinking in
#24, what may make
most sense is to get #26 fully working to allow partners to try storing
deals on Filecoin, while we continue to build this solution out and
harden/productionize Singularity's deal making.
  • Loading branch information
hannahhoward authored Aug 17, 2023
2 parents 6ce5186 + 131cb7f commit 9cbb910
Show file tree
Hide file tree
Showing 4 changed files with 788 additions and 59 deletions.
163 changes: 163 additions & 0 deletions blob/singularity_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package blob

import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"

"github.com/data-preservation-programs/singularity/client"
"github.com/data-preservation-programs/singularity/handler/dataset"
"github.com/data-preservation-programs/singularity/handler/datasource"
)

const motionDatasetName = "MOTION_DATASET"
const maxCarSize = "31.5GiB"

type SingularityStore struct {
local *LocalStore
sourceID uint32
singularityClient client.Client
}

func NewSingularityStore(dir string, singularityClient client.Client) *SingularityStore {
local := NewLocalStore(dir)
return &SingularityStore{
local: local,
singularityClient: singularityClient,
}
}

func (l *SingularityStore) Start(ctx context.Context) error {
_, err := l.singularityClient.CreateDataset(ctx, dataset.CreateRequest{
Name: motionDatasetName,
MaxSizeStr: maxCarSize,
})
var asDuplicatedRecord client.DuplicateRecordError

// return errors, but ignore duplicated record, that means we just already created it
if err != nil && !errors.As(err, &asDuplicatedRecord) {
return err
}
source, err := l.singularityClient.CreateLocalSource(ctx, motionDatasetName, datasource.LocalRequest{
SourcePath: l.local.dir,
RescanInterval: "0",
DeleteAfterExport: false,
})
// handle source already created
if errors.As(err, &asDuplicatedRecord) {
sources, err := l.singularityClient.ListSourcesByDataset(ctx, motionDatasetName)
if err != nil {
return err
}
for _, source := range sources {
if source.Path == strings.TrimSuffix(l.local.dir, "/") {
l.sourceID = source.ID
return nil
}
}
// this shouldn't happen - if we have a duplicate, the record should exist
return errors.New("unable to locate dataset")
}
// return errors, but ignore duplicated record, that means we just already created it
if err != nil {
return err
}
l.sourceID = source.ID
return nil
}

func (l *SingularityStore) Shutdown(_ context.Context) error {
return nil
}

func (s *SingularityStore) Put(ctx context.Context, reader io.ReadCloser) (*Descriptor, error) {
desc, err := s.local.Put(ctx, reader)
if err != nil {
return nil, err
}
model, err := s.singularityClient.PushItem(ctx, s.sourceID, datasource.ItemInfo{Path: desc.ID.String() + ".bin"})
if err != nil {
return nil, fmt.Errorf("error creating singularity entry: %w", err)
}
idFile, err := os.CreateTemp(s.local.dir, "motion_local_store_*.bin.temp")
if err != nil {
return nil, err
}
defer idFile.Close()
_, err = idFile.Write([]byte(strconv.FormatUint(model.ID, 10)))
if err != nil {
os.Remove(idFile.Name())
return nil, err
}
if err = os.Rename(idFile.Name(), path.Join(s.local.dir, desc.ID.String()+".id")); err != nil {
return nil, err
}
return desc, nil
}

func (s *SingularityStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, error) {
// this is largely artificial -- we're verifying the singularity item, but just reading from
// the local store
idStream, err := os.Open(path.Join(s.local.dir, id.String()+".id"))
if err != nil {
return nil, err
}
itemIDString, err := io.ReadAll(idStream)
if err != nil {
return nil, err
}
itemID, err := strconv.ParseUint(string(itemIDString), 10, 64)
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
}
if err != nil {
return nil, fmt.Errorf("error loading singularity entry: %w", err)
}
var decoded ID
err = decoded.Decode(strings.TrimSuffix(path.Base(item.Path), path.Ext(item.Path)))
if err != nil {
return nil, err
}
return s.local.Get(ctx, decoded)
}

func (s *SingularityStore) Describe(ctx context.Context, id ID) (*Descriptor, error) {
// this is largely artificial -- we're verifying the singularity item, but just reading from
// the local store
idStream, err := os.Open(path.Join(s.local.dir, id.String()+".id"))
if err != nil {
return nil, err
}
itemIDString, err := io.ReadAll(idStream)
if err != nil {
return nil, err
}
itemID, err := strconv.ParseUint(string(itemIDString), 10, 64)
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
}
if err != nil {
return nil, fmt.Errorf("error loading singularity entry: %w", err)
}
var decoded ID
err = decoded.Decode(strings.TrimSuffix(path.Base(item.Path), path.Ext(item.Path)))
if err != nil {
return nil, err
}
return s.local.Describe(ctx, decoded)
}
39 changes: 39 additions & 0 deletions cmd/motion/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package main

import (
"net/http"
"os"
"os/signal"

"github.com/data-preservation-programs/singularity/client"
httpclient "github.com/data-preservation-programs/singularity/client/http"
libclient "github.com/data-preservation-programs/singularity/client/lib"
"github.com/data-preservation-programs/singularity/database"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/motion"
"github.com/filecoin-project/motion/blob"
Expand Down Expand Up @@ -45,6 +50,16 @@ func main() {
Usage: "Whether to generate the local wallet key if none is found",
Value: true,
},
&cli.BoolFlag{
Name: "experimentalSingularityStore",
Usage: "whether to use experimental Singularity store as the storage and deal making engine",
DefaultText: "Local storage is used",
},
&cli.StringFlag{
Name: "experimentalRemoteSingularityAPIUrl",
Usage: "when using a singularity as the storage engine, if set, uses a remote HTTP API to interface with Singularity",
DefaultText: "use singularity as a code library",
},
},
Action: func(cctx *cli.Context) error {

Expand Down Expand Up @@ -76,6 +91,30 @@ func main() {
return err
}
store = rbstore
} else if cctx.Bool("experimentalSingularityStore") {
singularityAPIUrl := cctx.String("experimentalRemoteSingularityAPIUrl")
var client client.Client
if singularityAPIUrl != "" {
client = httpclient.NewHTTPClient(http.DefaultClient, singularityAPIUrl)
} else {
db, err := database.OpenWithDefaults("sqlite:" + storeDir + "/singularity.db")
if err != nil {
logger.Errorw("Failed to open singularity database", "err", err)
return err
}
client, err = libclient.NewClient(db)
if err != nil {
logger.Errorw("Failed to get singularity client", "err", err)
return err
}
}
singularityStore := blob.NewSingularityStore(storeDir, client)
logger.Infow("Using Singularity blob store", "storeDir", storeDir)
if err := singularityStore.Start(cctx.Context); err != nil {
logger.Errorw("Failed to start Singularity blob store", "err", err)
return err
}
store = singularityStore
} else {
store = blob.NewLocalStore(storeDir)
logger.Infow("Using local blob store", "storeDir", storeDir)
Expand Down
Loading

0 comments on commit 9cbb910

Please sign in to comment.