diff --git a/integration/singularity/store.go b/integration/singularity/store.go index 6b58f52..77b17cf 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -145,7 +145,7 @@ func (s *SingularityStore) Start(ctx context.Context) error { } } if wlt == nil { - logger.Info("Wallet is not found on singularity. Importing...") + logger.Info("Wallet is not found on singularity. Importing wallet") importWalletRes, err := s.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{ Context: ctx, Request: &models.WalletImportRequest{ @@ -176,7 +176,7 @@ func (s *SingularityStore) Start(ctx context.Context) error { } } if !walletFound { - logger.Info("Wallet was not found. Creating...") + logger.Info("Wallet was not found. Creating wallet") if attachWalletRes, err := s.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{ Context: ctx, ID: s.preparationName, @@ -221,7 +221,7 @@ func (s *SingularityStore) Start(ctx context.Context) error { } if foundSchedule != nil { // If schedule was found, update it - logger.Infow("Schedule found for provider. Updating with latest settings...", "id", foundSchedule.ID) + logger.Infow("Schedule found for provider. Updating with latest settings", "id", foundSchedule.ID) _, err := s.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{ Context: ctx, ID: foundSchedule.ID, @@ -250,7 +250,7 @@ func (s *SingularityStore) Start(ctx context.Context) error { } } else { // Otherwise, create it - logger.Info("Schedule not found for provider. Creating...") + logger.Info("Schedule not found for provider. Creating schedule") if createScheduleRes, err := s.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{ Context: ctx, Schedule: &models.ScheduleCreateRequest{ @@ -369,7 +369,7 @@ func (s *SingularityStore) Shutdown(ctx context.Context) error { s.forcePack.Stop() - logger.Infof("Singularity store shut down") + logger.Info("Singularity store shut down") return nil } @@ -457,7 +457,7 @@ func (s *SingularityStore) PassGet(w http.ResponseWriter, r *http.Request, id bl http.Error(w, "", http.StatusInternalServerError) return } - logger.Info("Retrieved file", "id", id.String()) + logger.Infow("Retrieved file", "id", id.String()) } func (s *SingularityStore) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error) { @@ -576,7 +576,7 @@ func (s *SingularityStore) runCleanupJob() { } func (s *SingularityStore) cleanup(ctx context.Context) error { - logger.Infof("Starting local store cleanup...") + logger.Info("Starting local store cleanup") dir, err := os.ReadDir(s.local.Dir()) if err != nil { @@ -585,7 +585,6 @@ func (s *SingularityStore) cleanup(ctx context.Context) error { var binsToDelete []string -binIteration: for _, entry := range dir { binFileName := entry.Name() @@ -597,53 +596,62 @@ binIteration: idFileName := id + ".id" fileIDString, err := os.ReadFile(filepath.Join(s.local.Dir(), idFileName)) if err != nil { - logger.Warnf("Failed to open ID map file for %s: %v", id, err) + logger.Warnw("Failed to open ID map file", "err", err, "id", id) continue } fileID, err := strconv.ParseUint(string(fileIDString), 10, 64) if err != nil { - logger.Warnf("Failed to parse file ID %s as integer: %v", fileIDString, err) + logger.Warnw("Failed to parse file ID as integer", "err", err, "fileID", fileIDString) continue } - getFileDealsRes, err := s.singularityClient.File.GetFileDeals(&file.GetFileDealsParams{ - Context: ctx, - ID: int64(fileID), - }) - if err != nil { - logger.Warnf("Failed to get file deals for %v: %v", fileID, err) + if !s.hasDealForAllProviders(ctx, int64(fileID)) { + // If no deal was found for this file and SP, the local bin file + // cannot be deleted yet - continue to the next one continue } - // Make sure the file has at least 1 deal for every SP - for _, sp := range s.storageProviders { - var foundDealForSP bool - for _, deal := range getFileDealsRes.Payload { - if deal.Provider == sp.String() && (deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive) { - foundDealForSP = true - break - } - } - - if !foundDealForSP { - // If no deal was found for this file and SP, the local bin file - // cannot be deleted yet - continue to the next one - continue binIteration - } - } - // If deals have been made for all SPs, the local bin file can be // deleted - logger.Infof("deleting local copy for deal %s, file %s", id, binFileName) + logger.Infow("Deleting local copy for deal", "id", id, "file", binFileName) binsToDelete = append(binsToDelete, binFileName) } for _, binFileName := range binsToDelete { - if err := os.Remove(filepath.Join(s.local.Dir(), binFileName)); err != nil { - logger.Warnf("Failed to delete local bin file %s that was staged for removal: %v", binFileName, err) + if err = os.Remove(filepath.Join(s.local.Dir(), binFileName)); err != nil { + logger.Warnw("Failed to delete local bin file that was staged for removal", "err", err, "file", binFileName) } } - logger.Infof("Cleaned up %v unneeded local files", len(binsToDelete)) + logger.Infow("Cleaned up unneeded local files", "count", len(binsToDelete)) return nil } + +// hasDealForAllProviders returns true if the file has at least 1 deal for +// every SP. +func (s *SingularityStore) hasDealForAllProviders(ctx context.Context, fileID int64) bool { + getFileDealsRes, err := s.singularityClient.File.GetFileDeals(&file.GetFileDealsParams{ + Context: ctx, + ID: fileID, + }) + if err != nil { + logger.Warnw("Failed to get file deals", "err", err, "file", fileID) + return false + } + + // Make sure the file has at least 1 deal for every SP + for _, sp := range s.storageProviders { + var foundDealForSP bool + for _, deal := range getFileDealsRes.Payload { + if deal.Provider == sp.String() && (deal.State == models.ModelDealStatePublished || deal.State == models.ModelDealStateActive) { + foundDealForSP = true + break + } + } + if !foundDealForSP { + return false + } + } + + return true +}