Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1114: check backup files before mark it as done #1017

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,18 +337,44 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l
}

if inf.IsLeader() {
err = b.reconcileStatus(ctx, bcp.Name, opid.String(), defs.StatusDone, nil)
shards, err := topo.ClusterMembers(ctx, b.leadConn.MongoClient())
if err != nil {
return errors.Wrap(err, "check cluster for backup done")
return errors.Wrap(err, "check cluster for backup done: get cluster members")
}

err = b.convergeCluster(ctx, bcp.Name, opid.String(), shards, defs.StatusDone)
err = errors.Wrap(err, "check cluster for backup done: convergeCluster")
if err != nil {
return err
}

bcpm, err = NewDBManager(b.leadConn).GetBackupByName(ctx, bcp.Name)
if err != nil {
return errors.Wrap(err, "get backup metadata")
}

// PBM-1114: update file metadata with the same values as in database
unix := time.Now().Unix()
bcpm.Status = defs.StatusDone
bcpm.LastTransitionTS = unix
bcpm.Conditions = append(bcpm.Conditions, Condition{
Timestamp: unix,
Status: defs.StatusDone,
})

err = writeMeta(stg, bcpm)
return errors.Wrap(err, "dump metadata")
if err != nil {
return errors.Wrap(err, "dump metadata")
}

err = CheckBackupFiles(ctx, stg, bcp.Name)
if err != nil {
return errors.Wrap(err, "check backup files")
}

err = ChangeBackupStateWithUnixTime(ctx, b.leadConn, bcp.Name, defs.StatusDone, unix, "")
return errors.Wrapf(err, "check cluster for backup done: update backup meta with %s",
defs.StatusDone)
} else {
// to be sure the locks released only after the "done" status had written
err = b.waitForStatus(ctx, bcp.Name, defs.StatusDone, nil)
Expand Down Expand Up @@ -432,14 +458,18 @@ func (b *Backup) reconcileStatus(
}

if timeout != nil {
return errors.Wrap(
b.convergeClusterWithTimeout(ctx, bcpName, opid, shards, status, *timeout),
"convergeClusterWithTimeout")
err = b.convergeClusterWithTimeout(ctx, bcpName, opid, shards, status, *timeout)
err = errors.Wrap(err, "convergeClusterWithTimeout")
} else {
err = b.convergeCluster(ctx, bcpName, opid, shards, status)
err = errors.Wrap(err, "convergeCluster")
}
if err != nil {
return err
}

return errors.Wrap(
b.convergeCluster(ctx, bcpName, opid, shards, status),
"convergeCluster")
err = ChangeBackupState(b.leadConn, bcpName, status, "")
return errors.Wrapf(err, "update backup meta with %s", status)
}

// convergeCluster waits until all given shards reached `status` and updates a cluster status
Expand Down Expand Up @@ -480,10 +510,11 @@ func (b *Backup) convergeClusterWithTimeout(
status defs.Status,
t time.Duration,
) error {
tk := time.NewTicker(time.Second * 1)
tk := time.NewTicker(time.Second)
defer tk.Stop()

tout := time.After(t)
tout := time.NewTimer(t)
defer tout.Stop()

for {
select {
Expand All @@ -495,7 +526,7 @@ func (b *Backup) convergeClusterWithTimeout(
if ok {
return nil
}
case <-tout:
case <-tout.C:
return errors.Wrap(errConvergeTimeOut, t.String())
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -554,15 +585,7 @@ func (b *Backup) converged(
}
}

if shardsToFinish == 0 {
err := ChangeBackupState(b.leadConn, bcpName, status, "")
if err != nil {
return false, errors.Wrapf(err, "update backup meta with %s", status)
}
return true, nil
}

return false, nil
return shardsToFinish == 0, nil
}

func (b *Backup) waitForStatus(
Expand Down
27 changes: 23 additions & 4 deletions pbm/backup/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,34 @@ func getBackupMeta(ctx context.Context, conn connect.Client, clause bson.D) (*Ba
}

func ChangeBackupStateOPID(conn connect.Client, opid string, s defs.Status, msg string) error {
return changeBackupState(context.Background(), conn, bson.D{{"opid", opid}}, s, msg)
return changeBackupState(context.TODO(),
conn, bson.D{{"opid", opid}}, time.Now().UTC().Unix(), s, msg)
}

func ChangeBackupState(conn connect.Client, bcpName string, s defs.Status, msg string) error {
return changeBackupState(context.Background(), conn, bson.D{{"name", bcpName}}, s, msg)
return changeBackupState(context.TODO(),
conn, bson.D{{"name", bcpName}}, time.Now().UTC().Unix(), s, msg)
}

func changeBackupState(ctx context.Context, conn connect.Client, clause bson.D, s defs.Status, msg string) error {
ts := time.Now().UTC().Unix()
func ChangeBackupStateWithUnixTime(
ctx context.Context,
conn connect.Client,
bcpName string,
s defs.Status,
unix int64,
msg string,
) error {
return changeBackupState(ctx, conn, bson.D{{"name", bcpName}}, time.Now().UTC().Unix(), s, msg)
}

func changeBackupState(
ctx context.Context,
conn connect.Client,
clause bson.D,
ts int64,
s defs.Status,
msg string,
) error {
_, err := conn.BcpCollection().UpdateOne(
ctx,
clause,
Expand Down
105 changes: 27 additions & 78 deletions pbm/backup/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,107 +10,52 @@ import (
"golang.org/x/sync/errgroup"

"github.com/percona/percona-backup-mongodb/pbm/archive"
"github.com/percona/percona-backup-mongodb/pbm/config"
"github.com/percona/percona-backup-mongodb/pbm/defs"
"github.com/percona/percona-backup-mongodb/pbm/errors"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/storage"
sfs "github.com/percona/percona-backup-mongodb/pbm/storage/fs"
"github.com/percona/percona-backup-mongodb/pbm/util"
"github.com/percona/percona-backup-mongodb/pbm/version"
)

type StorageManager interface {
GetAllBackups(ctx context.Context) ([]BackupMeta, error)
GetBackupByName(ctx context.Context, name string) (*BackupMeta, error)
}

type storageManagerImpl struct {
cfg *config.StorageConf
stg storage.Storage
}

func NewStorageManager(ctx context.Context, cfg *config.StorageConf) (*storageManagerImpl, error) {
stg, err := util.StorageFromConfig(cfg, log.LogEventFromContext(ctx))
func CheckBackupFiles(ctx context.Context, stg storage.Storage, name string) error {
bcp, err := ReadMetadata(stg, name+defs.MetadataFileSuffix)
if err != nil {
return nil, errors.Wrap(err, "unable to get backup store")
return errors.Wrap(err, "read backup metadata")
}

_, err = stg.FileStat(defs.StorInitFile)
if !errors.Is(err, storage.ErrNotExist) {
return nil, err
}

return &storageManagerImpl{cfg: cfg, stg: stg}, nil
return CheckBackupDataFiles(ctx, stg, bcp)
}

func (m *storageManagerImpl) GetAllBackups(ctx context.Context) ([]BackupMeta, error) {
l := log.LogEventFromContext(ctx)

bcpList, err := m.stg.List("", defs.MetadataFileSuffix)
func ReadMetadata(stg storage.Storage, filename string) (*BackupMeta, error) {
rdr, err := stg.SourceReader(filename)
if err != nil {
return nil, errors.Wrap(err, "get a backups list from the storage")
}
l.Debug("got backups list: %v", len(bcpList))

var rv []BackupMeta
for _, b := range bcpList {
l.Debug("bcp: %v", b.Name)

d, err := m.stg.SourceReader(b.Name)
if err != nil {
return nil, errors.Wrapf(err, "read meta for %v", b.Name)
}

v := BackupMeta{}
err = json.NewDecoder(d).Decode(&v)
d.Close()
if err != nil {
return nil, errors.Wrapf(err, "unmarshal backup meta [%s]", b.Name)
}

err = CheckBackupFiles(ctx, &v, m.stg)
if err != nil {
l.Warning("skip snapshot %s: %v", v.Name, err)
v.Status = defs.StatusError
v.Err = err.Error()
}
rv = append(rv, v)
return nil, errors.Wrap(err, "open")
}
defer rdr.Close()

return rv, nil
}

func (m *storageManagerImpl) GetBackupByName(ctx context.Context, name string) (*BackupMeta, error) {
l := log.LogEventFromContext(ctx)
l.Debug("get backup by name: %v", name)

rdr, err := m.stg.SourceReader(name + defs.MetadataFileSuffix)
var meta *BackupMeta
err = json.NewDecoder(rdr).Decode(&meta)
if err != nil {
return nil, errors.Wrapf(err, "read meta for %v", name)
return nil, errors.Wrap(err, "decode")
}
defer rdr.Close()

v := &BackupMeta{}
if err := json.NewDecoder(rdr).Decode(&v); err != nil {
return nil, errors.Wrapf(err, "unmarshal backup meta [%s]", name)
}
return meta, nil
}

if err := CheckBackupFiles(ctx, v, m.stg); err != nil {
l.Warning("no backup files %s: %v", v.Name, err)
v.Status = defs.StatusError
v.Err = err.Error()
func CheckBackupDataFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
switch bcp.Type {
case defs.LogicalBackup:
return checkLogicalBackupFiles(ctx, stg, bcp)
case defs.PhysicalBackup, defs.IncrementalBackup:
return checkPhysicalBackupFiles(ctx, stg, bcp)
case defs.ExternalBackup:
return nil // no files available
}

return v, nil
return errors.Errorf("unknown backup type %s", bcp.Type)
}

func CheckBackupFiles(ctx context.Context, bcp *BackupMeta, stg storage.Storage) error {
// !!! TODO: Check physical files ?
if bcp.Type != defs.LogicalBackup {
return nil
}

func checkLogicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
legacy := version.IsLegacyArchive(bcp.PBMVersion)
eg, _ := errgroup.WithContext(ctx)
for _, rs := range bcp.Replsets {
Expand Down Expand Up @@ -163,6 +108,10 @@ func CheckBackupFiles(ctx context.Context, bcp *BackupMeta, stg storage.Storage)
return eg.Wait()
}

func checkPhysicalBackupFiles(ctx context.Context, stg storage.Storage, bcp *BackupMeta) error {
return nil
}

func ReadArchiveNamespaces(stg storage.Storage, metafile string) ([]*archive.Namespace, error) {
r, err := stg.SourceReader(metafile)
if err != nil {
Expand Down
15 changes: 3 additions & 12 deletions pbm/resync/rsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package resync

import (
"context"
"encoding/json"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -293,21 +292,13 @@ func getAllBackupMetaFromStorage(

backupMeta := make([]*backup.BackupMeta, 0, len(backupFiles))
for _, b := range backupFiles {
d, err := stg.SourceReader(b.Name)
meta, err := backup.ReadMetadata(stg, b.Name)
if err != nil {
l.Error("read meta for %v", b.Name)
l.Error("read metadata of backup %s: %v", b.Name, err)
continue
}

var meta *backup.BackupMeta
err = json.NewDecoder(d).Decode(&meta)
d.Close()
if err != nil {
l.Error("unmarshal backup meta [%s]", b.Name)
continue
}

err = backup.CheckBackupFiles(ctx, meta, stg)
err = backup.CheckBackupDataFiles(ctx, stg, meta)
if err != nil {
l.Warning("skip snapshot %s: %v", meta.Name, err)
meta.Status = defs.StatusError
Expand Down
Loading