Skip to content

Commit

Permalink
PBM-1397: implement checkPhysicalBackupDataFiles()
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Sep 24, 2024
1 parent 3ecb81a commit 2710ea2
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 27 deletions.
63 changes: 60 additions & 3 deletions pbm/backup/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func CheckBackupDataFiles(ctx context.Context, stg storage.Storage, bcp *BackupM
case defs.LogicalBackup:
return checkLogicalBackupDataFiles(ctx, stg, bcp)
case defs.PhysicalBackup, defs.IncrementalBackup:
return checkPhysicalBackupFiles(ctx, stg, bcp)
return checkPhysicalBackupDataFiles(ctx, stg, bcp)
case defs.ExternalBackup:
return nil // no files available
}
Expand Down Expand Up @@ -111,8 +111,65 @@ func checkLogicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *Ba
return errors.Join(errs...)
}

func checkPhysicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
return nil
func checkPhysicalBackupDataFiles(_ context.Context, stg storage.Storage, bcp *BackupMeta) error {
eg := util.NewErrorGroup(runtime.NumCPU() * 2)
for _, rs := range bcp.Replsets {
eg.Go(func() error {
var filelist Filelist
if version.HasFilelistFile(bcp.PBMVersion) {
var err error
filelist, err = ReadFilelistForReplset(stg, bcp.Name, rs.Name)
if err != nil {
return errors.Wrapf(err, "read filelist for replset %s", rs.Name)
}
} else {
filelist = rs.Files
}
if len(filelist) == 0 {
return errors.Errorf("empty filelist for replset %s", rs.Name)
}

for _, f := range filelist {
if f.Len <= 0 {
continue // no file expected
}

eg.Go(func() error {
filepath := path.Join(bcp.Name, rs.Name, f.Path(bcp.Compression))
stat, err := stg.FileStat(filepath)
if err != nil {
return errors.Wrapf(err, "file %s", filepath)
}
if stat.Size == 0 {
return errors.Errorf("empty file %s", filepath)
}

return nil
})
}

return nil
})
}

errs := eg.Wait()
return errors.Join(errs...)
}

func ReadFilelistForReplset(stg storage.Storage, bcpName, rsName string) (Filelist, error) {
pfFilepath := path.Join(bcpName, rsName, FilelistName)
rdr, err := stg.SourceReader(pfFilepath)
if err != nil {
return nil, errors.Wrapf(err, "open %q", pfFilepath)
}
defer rdr.Close()

filelist, err := ReadFilelist(rdr)
if err != nil {
return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath)
}

return filelist, nil
}

func ReadArchiveNamespaces(stg storage.Storage, metafile string) ([]*archive.Namespace, error) {
Expand Down
10 changes: 10 additions & 0 deletions pbm/backup/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -156,6 +157,15 @@ func (f File) String() string {
return fmt.Sprintf("%s [%d:%d]", f.Name, f.Off, f.Len)
}

func (f File) Path(c compress.CompressionType) string {
src := filepath.Join(f.Name + c.Suffix())
if f.Len == 0 {
return src
}

return fmt.Sprintf("%s.%d-%d", src, f.Off, f.Len)
}

func (f *File) WriteTo(w io.Writer) (int64, error) {
fd, err := os.Open(f.Name)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pbm/restore/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,6 @@ func (r *Restore) RunSnapshot(
usersAndRolesOpt restoreUsersAndRolesOption,
) error {
var rdr io.ReadCloser

var err error
if version.IsLegacyArchive(bcp.PBMVersion) {
sr, err := r.bcpStg.SourceReader(dump)
Expand Down
5 changes: 1 addition & 4 deletions pbm/restore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,10 +1089,7 @@ func (r *PhysRestore) copyFiles() (*s3.DownloadStat, error) {
for i := len(r.files) - 1; i >= 0; i-- {
set := r.files[i]
for _, f := range set.Data {
src := filepath.Join(set.BcpName, setName, f.Name+set.Cmpr.Suffix())
if f.Len != 0 {
src += fmt.Sprintf(".%d-%d", f.Off, f.Len)
}
src := filepath.Join(set.BcpName, setName, f.Path(set.Cmpr))
// cut dbpath from destination if there is any (see PBM-1058)
fname := f.Name
if set.dbpath != "" {
Expand Down
21 changes: 2 additions & 19 deletions sdk/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sdk

import (
"context"
"path"
"runtime"
"time"

Expand Down Expand Up @@ -192,7 +191,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error {
rs := &bcp.Replsets[i]

eg.Go(func() error {
filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name)
filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name)
if err != nil {
return errors.Wrapf(err, "get filelist for %q [rs: %s] backup", bcp.Name, rs.Name)
}
Expand Down Expand Up @@ -226,7 +225,7 @@ func fillFilelistForBackup(ctx context.Context, bcp *BackupMetadata) error {
rs := &bcp.Replsets[i]

eg.Go(func() error {
filelist, err := getFilelistForReplset(stg, bcp.Name, rs.Name)
filelist, err := backup.ReadFilelistForReplset(stg, bcp.Name, rs.Name)
if err != nil {
return errors.Wrapf(err, "fetch files for %q [rs: %s] backup", bcp.Name, rs.Name)
}
Expand Down Expand Up @@ -254,22 +253,6 @@ func getStorageForRead(ctx context.Context, bcp *backup.BackupMeta) (storage.Sto
return stg, nil
}

func getFilelistForReplset(stg storage.Storage, bcpName, rsName string) (backup.Filelist, error) {
pfFilepath := path.Join(bcpName, rsName, backup.FilelistName)
rdr, err := stg.SourceReader(pfFilepath)
if err != nil {
return nil, errors.Wrapf(err, "open %q", pfFilepath)
}
defer rdr.Close()

filelist, err := backup.ReadFilelist(rdr)
if err != nil {
return nil, errors.Wrapf(err, "parse filelist %q", pfFilepath)
}

return filelist, nil
}

func (c *Client) GetRestoreByName(ctx context.Context, name string) (*RestoreMetadata, error) {
return restore.GetRestoreMeta(ctx, c.conn, name)
}
Expand Down

0 comments on commit 2710ea2

Please sign in to comment.