Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Add replica status when using singularity store #67

Merged
merged 6 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions blob/singularity_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/data-preservation-programs/singularity/client"
"github.com/data-preservation-programs/singularity/handler/dataset"
"github.com/data-preservation-programs/singularity/handler/datasource"
"github.com/data-preservation-programs/singularity/service/epochutil"
)

const motionDatasetName = "MOTION_DATASET"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *SingularityStore) Put(ctx context.Context, reader io.ReadCloser) (*Desc
if err != nil {
return nil, err
}
model, err := s.singularityClient.PushItem(ctx, s.sourceID, datasource.ItemInfo{Path: desc.ID.String() + ".bin"})
model, err := s.singularityClient.PushFile(ctx, s.sourceID, datasource.FileInfo{Path: desc.ID.String() + ".bin"})
if err != nil {
return nil, fmt.Errorf("error creating singularity entry: %w", err)
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func (s *SingularityStore) Get(ctx context.Context, id ID) (io.ReadSeekCloser, e
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
item, err := s.singularityClient.GetFile(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *SingularityStore) Describe(ctx context.Context, id ID) (*Descriptor, er
if err != nil {
return nil, err
}
item, err := s.singularityClient.GetItem(ctx, itemID)
item, err := s.singularityClient.GetFile(ctx, itemID)
var asNotFoundError client.NotFoundError
if errors.As(err, &asNotFoundError) {
return nil, ErrBlobNotFound
Expand All @@ -159,5 +160,25 @@ func (s *SingularityStore) Describe(ctx context.Context, id ID) (*Descriptor, er
if err != nil {
return nil, err
}
return s.local.Describe(ctx, decoded)
descriptor, err := s.local.Describe(ctx, decoded)
if err != nil {
return nil, err
}
deals, err := s.singularityClient.GetFileDeals(ctx, itemID)
if err != nil {
return nil, err
}
replicas := make([]Replica, 0, len(deals))
for _, deal := range deals {
replicas = append(replicas, Replica{
// TODO: figure out how to get LastVerified
Provider: deal.Provider,
Status: string(deal.State),
Expiration: epochutil.EpochToTime(deal.EndEpoch),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes mainnet right? We probably want to parameterise this for devnet testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

})
}
descriptor.Status = &Status{
Replicas: replicas,
}
return descriptor, nil
}
28 changes: 26 additions & 2 deletions cmd/motion/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
httpclient "github.com/data-preservation-programs/singularity/client/http"
libclient "github.com/data-preservation-programs/singularity/client/lib"
"github.com/data-preservation-programs/singularity/database"
"github.com/data-preservation-programs/singularity/service/epochutil"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/motion"
"github.com/filecoin-project/motion/blob"
"github.com/filecoin-project/motion/wallet"
"github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"github.com/ybbus/jsonrpc/v3"
)

var logger = log.Logger("motion/cmd")
Expand Down Expand Up @@ -68,6 +70,20 @@ func main() {
Usage: "Storage providers to which to make deals with. Multiple providers may be specified.",
DefaultText: "No deals are made to replicate data onto storage providers.",
},
&cli.StringFlag{
Name: "lotusApi",
Category: "Lotus",
Usage: "Lotus RPC API endpoint",
Value: "https://api.node.glif.io/rpc/v1",
EnvVars: []string{"LOTUS_API"},
},
&cli.StringFlag{
Name: "lotusToken",
Category: "Lotus",
Usage: "Lotus RPC API token",
Value: "",
EnvVars: []string{"LOTUS_TOKEN"},
},
},
Action: func(cctx *cli.Context) error {

Expand All @@ -88,6 +104,8 @@ func main() {

storeDir := cctx.String("storeDir")
var store blob.Store
lotusAPI := cctx.String("lotusApi")
lotusToken := cctx.String("lotusToken")
if cctx.Bool("experimentalRibsStore") {
rbstore, err := blob.NewRibsStore(storeDir, ks)
if err != nil {
Expand All @@ -105,12 +123,18 @@ func main() {
if singularityAPIUrl != "" {
client = httpclient.NewHTTPClient(http.DefaultClient, singularityAPIUrl)
} else {
db, err := database.OpenWithDefaults("sqlite:" + storeDir + "/singularity.db")
db, closer, err := database.OpenWithLogger("sqlite:" + storeDir + "/singularity.db")
defer closer.Close()
if err != nil {
logger.Errorw("Failed to open singularity database", "err", err)
return err
}
client, err = libclient.NewClient(db)
err = epochutil.Initialize(cctx.Context, lotusAPI, lotusToken)
if err != nil {
logger.Errorw("Failed to initialized epoch timing", "err", err)
return err
}
client, err = libclient.NewClient(db, jsonrpc.NewClient(lotusAPI))
if err != nil {
logger.Errorw("Failed to get singularity client", "err", err)
return err
Expand Down
Loading