Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Add replica status when using singularity store (#67)
Browse files Browse the repository at this point in the history
# Goals

This adds replication status when using singularity store, using an API
we wrote to connect files to the deals they are. There are likely some
longer term caveats here, in terms of structure, but this should suffice
for M2.

Also updates singularity to latest.

Note: awaiting a cut of Lassie tag to handle the libp2p update.
  • Loading branch information
hannahhoward authored Aug 26, 2023
2 parents deb4698 + 7345404 commit d55d5ea
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 210 deletions.
29 changes: 25 additions & 4 deletions blob/singularity_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/data-preservation-programs/singularity/client"
"github.com/data-preservation-programs/singularity/handler/dataset"
"github.com/data-preservation-programs/singularity/handler/datasource"
"github.com/data-preservation-programs/singularity/service/epochutil"
)

const motionDatasetName = "MOTION_DATASET"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *SingularityStore) Put(ctx context.Context, reader io.ReadCloser) (*Desc
if err != nil {
return nil, err
}
model, err := s.singularityClient.PushItem(ctx, s.sourceID, datasource.ItemInfo{Path: desc.ID.String() + ".bin"})
model, err := s.singularityClient.PushFile(ctx, s.sourceID, datasource.FileInfo{Path: desc.ID.String() + ".bin"})
if err != nil {
return nil, fmt.Errorf("error creating singularity entry: %w", err)
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func (s *SingularityStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, e
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
item, err := s.singularityClient.GetFile(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *SingularityStore) Describe(ctx context.Context, id ID) (*Descriptor, er
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
item, err := s.singularityClient.GetFile(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
Expand All @@ -159,5 +160,25 @@ func (s *SingularityStore) Describe(ctx context.Context, id ID) (*Descriptor, er
if err != nil {
return nil, err
}
return s.local.Describe(ctx, decoded)
descriptor, err := s.local.Describe(ctx, decoded)
if err != nil {
return nil, err
}
deals, err := s.singularityClient.GetFileDeals(ctx, itemID)
if err != nil {
return nil, err
}
replicas := make([]Replica, 0, len(deals))
for _, deal := range deals {
replicas = append(replicas, Replica{
// TODO: figure out how to get LastVerified
Provider: deal.Provider,
Status: string(deal.State),
Expiration: epochutil.EpochToTime(deal.EndEpoch),
})
}
descriptor.Status = &Status{
Replicas: replicas,
}
return descriptor, nil
}
28 changes: 26 additions & 2 deletions cmd/motion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
httpclient "github.com/data-preservation-programs/singularity/client/http"
libclient "github.com/data-preservation-programs/singularity/client/lib"
"github.com/data-preservation-programs/singularity/database"
"github.com/data-preservation-programs/singularity/service/epochutil"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/motion"
"github.com/filecoin-project/motion/blob"
"github.com/filecoin-project/motion/wallet"
"github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"github.com/ybbus/jsonrpc/v3"
)

var logger = log.Logger("motion/cmd")
Expand Down Expand Up @@ -68,6 +70,20 @@ func main() {
Usage: "Storage providers to which to make deals with. Multiple providers may be specified.",
DefaultText: "No deals are made to replicate data onto storage providers.",
},
&cli.StringFlag{
Name: "lotusApi",
Category: "Lotus",
Usage: "Lotus RPC API endpoint",
Value: "https://api.node.glif.io/rpc/v1",
EnvVars: []string{"LOTUS_API"},
},
&cli.StringFlag{
Name: "lotusToken",
Category: "Lotus",
Usage: "Lotus RPC API token",
Value: "",
EnvVars: []string{"LOTUS_TOKEN"},
},
},
Action: func(cctx *cli.Context) error {

Expand All @@ -88,6 +104,8 @@ func main() {

storeDir := cctx.String("storeDir")
var store blob.Store
lotusAPI := cctx.String("lotusApi")
lotusToken := cctx.String("lotusToken")
if cctx.Bool("experimentalRibsStore") {
rbstore, err := blob.NewRibsStore(storeDir, ks)
if err != nil {
Expand All @@ -105,12 +123,18 @@ func main() {
if singularityAPIUrl != "" {
client = httpclient.NewHTTPClient(http.DefaultClient, singularityAPIUrl)
} else {
db, err := database.OpenWithDefaults("sqlite:" + storeDir + "/singularity.db")
db, closer, err := database.OpenWithLogger("sqlite:" + storeDir + "/singularity.db")
defer closer.Close()
if err != nil {
logger.Errorw("Failed to open singularity database", "err", err)
return err
}
client, err = libclient.NewClient(db)
err = epochutil.Initialize(cctx.Context, lotusAPI, lotusToken)
if err != nil {
logger.Errorw("Failed to initialized epoch timing", "err", err)
return err
}
client, err = libclient.NewClient(db, jsonrpc.NewClient(lotusAPI))
if err != nil {
logger.Errorw("Failed to get singularity client", "err", err)
return err
Expand Down
Loading

0 comments on commit d55d5ea

Please sign in to comment.