diff --git a/cmd/pbm/backup.go b/cmd/pbm/backup.go index ec16fe545..714f7f78e 100644 --- a/cmd/pbm/backup.go +++ b/cmd/pbm/backup.go @@ -102,15 +102,8 @@ func runBackup( return nil, errors.Wrap(err, "backup pre-check") } - if err := checkConcurrentOp(ctx, conn); err != nil { - // PITR slicing can be run along with the backup start - agents will resolve it. - var e *concurentOpError - if !errors.As(err, &e) { - return nil, err - } - if e.op.Type != ctrl.CmdPITR { - return nil, err - } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err } cfg, err := config.GetProfiledConfig(ctx, conn, b.profile) diff --git a/cmd/pbm/common.go b/cmd/pbm/common.go index 77fb6eb86..0ea7391da 100644 --- a/cmd/pbm/common.go +++ b/cmd/pbm/common.go @@ -2,11 +2,15 @@ package main import ( "context" + "encoding/json" + "fmt" "time" "github.com/percona/percona-backup-mongodb/pbm/connect" "github.com/percona/percona-backup-mongodb/pbm/ctrl" + "github.com/percona/percona-backup-mongodb/pbm/defs" "github.com/percona/percona-backup-mongodb/pbm/errors" + "github.com/percona/percona-backup-mongodb/sdk" ) var errWaitTimeout = errors.New("Operation is in progress. Check pbm status and logs") @@ -16,3 +20,47 @@ func sendCmd(ctx context.Context, conn connect.Client, cmd ctrl.Cmd) error { _, err := conn.CmdStreamCollection().InsertOne(ctx, cmd) return err } + +func checkForAnotherOperation(ctx context.Context, pbm *sdk.Client) error { + locks, err := pbm.OpLocks(ctx) + if err != nil { + return errors.Wrap(err, "get operation lock") + } + if len(locks) == 0 { + return nil + } + + ts, err := sdk.ClusterTime(ctx, pbm) + if err != nil { + return errors.Wrap(err, "get cluster time") + } + + for _, l := range locks { + if l.Heartbeat.T+defs.StaleFrameSec >= ts.T { + return &concurentOpError{l} + } + } + + return nil +} + +type concurentOpError struct{ sdk.OpLock } + +func (e *concurentOpError) Error() string { + return fmt.Sprintf("another operation in progress, %s/%s [%s/%s]", + e.Cmd, e.OpID, e.Replset, e.Node) +} + +func (e *concurentOpError) MarshalJSON() ([]byte, error) { + s := map[string]any{ + "error": "another operation in progress", + "operation": map[string]any{ + "type": e.Cmd, + "opid": e.OpID, + "replset": e.Replset, + "node": e.Node, + }, + } + + return json.Marshal(s) +} diff --git a/cmd/pbm/config.go b/cmd/pbm/config.go index 64fa9c0f5..72c604b2d 100644 --- a/cmd/pbm/config.go +++ b/cmd/pbm/config.go @@ -47,7 +47,18 @@ func (c confVals) String() string { return s } -func runConfig(ctx context.Context, conn connect.Client, pbm *sdk.Client, c *configOpts) (fmt.Stringer, error) { +func runConfig( + ctx context.Context, + conn connect.Client, + pbm *sdk.Client, + c *configOpts, +) (fmt.Stringer, error) { + if len(c.set) != 0 || c.rsync || c.file != "" { + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } + } + switch { case len(c.set) > 0: var o confVals diff --git a/cmd/pbm/delete.go b/cmd/pbm/delete.go index 93656cf54..1d4e3669b 100644 --- a/cmd/pbm/delete.go +++ b/cmd/pbm/delete.go @@ -44,6 +44,9 @@ func deleteBackup( if d.bcpType != "" && d.olderThan == "" { return nil, errors.New("cannot use --type without --older-then") } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } var cid sdk.CommandID var err error @@ -170,6 +173,9 @@ func deletePITR( if d.olderThan != "" && d.all { return nil, errors.New("cannot use --older-then and --all at the same command") } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } var until primitive.Timestamp if d.all { @@ -255,6 +261,9 @@ func doCleanup(ctx context.Context, conn connect.Client, pbm *sdk.Client, d *cle realTime := n.Format(time.RFC3339) return nil, errors.Errorf("--older-than %q is after now %q", providedTime, realTime) } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } info, err := pbm.CleanupReport(ctx, ts) if err != nil { diff --git a/cmd/pbm/main.go b/cmd/pbm/main.go index a71143d35..06dce4ee5 100644 --- a/cmd/pbm/main.go +++ b/cmd/pbm/main.go @@ -16,10 +16,8 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/connect" "github.com/percona/percona-backup-mongodb/pbm/defs" "github.com/percona/percona-backup-mongodb/pbm/errors" - "github.com/percona/percona-backup-mongodb/pbm/lock" "github.com/percona/percona-backup-mongodb/pbm/log" "github.com/percona/percona-backup-mongodb/pbm/oplog" - "github.com/percona/percona-backup-mongodb/pbm/topo" "github.com/percona/percona-backup-mongodb/pbm/version" "github.com/percona/percona-backup-mongodb/sdk" ) @@ -516,9 +514,9 @@ func main() { case descBcpCmd.FullCommand(): out, err = describeBackup(ctx, pbm, &descBcp) case restoreCmd.FullCommand(): - out, err = runRestore(ctx, conn, &restore, pbmOutF) + out, err = runRestore(ctx, conn, pbm, &restore, pbmOutF) case replayCmd.FullCommand(): - out, err = replayOplog(ctx, conn, replayOpts, pbmOutF) + out, err = replayOplog(ctx, conn, pbm, replayOpts, pbmOutF) case listCmd.FullCommand(): out, err = runList(ctx, conn, pbm, &list) case deleteBcpCmd.FullCommand(): @@ -775,55 +773,3 @@ func parseDateT(v string) (time.Time, error) { return time.Time{}, errInvalidFormat } - -type concurentOpError struct { - op *lock.LockHeader -} - -func (e *concurentOpError) Error() string { - return fmt.Sprintf("another operation in progress, %s/%s [%s/%s]", e.op.Type, e.op.OPID, e.op.Replset, e.op.Node) -} - -func (e *concurentOpError) As(err any) bool { - if err == nil { - return false - } - - er, ok := err.(*concurentOpError) - if !ok { - return false - } - - er.op = e.op - return true -} - -func (e *concurentOpError) MarshalJSON() ([]byte, error) { - s := make(map[string]interface{}) - s["error"] = "another operation in progress" - s["operation"] = e.op - return json.Marshal(s) -} - -func checkConcurrentOp(ctx context.Context, conn connect.Client) error { - locks, err := lock.GetLocks(ctx, conn, &lock.LockHeader{}) - if err != nil { - return errors.Wrap(err, "get locks") - } - - ts, err := topo.GetClusterTime(ctx, conn) - if err != nil { - return errors.Wrap(err, "read cluster time") - } - - // Stop if there is some live operation. - // But in case of stale lock just move on - // and leave it for agents to deal with. - for _, l := range locks { - if l.Heartbeat.T+defs.StaleFrameSec >= ts.T { - return &concurentOpError{&l.LockHeader} - } - } - - return nil -} diff --git a/cmd/pbm/oplog.go b/cmd/pbm/oplog.go index c72be995d..2b25d0206 100644 --- a/cmd/pbm/oplog.go +++ b/cmd/pbm/oplog.go @@ -10,6 +10,7 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/defs" "github.com/percona/percona-backup-mongodb/pbm/errors" "github.com/percona/percona-backup-mongodb/pbm/restore" + "github.com/percona/percona-backup-mongodb/sdk" ) type replayOptions struct { @@ -40,7 +41,13 @@ func (r oplogReplayResult) String() string { return fmt.Sprintf("Oplog replay %q has started", r.Name) } -func replayOplog(ctx context.Context, conn connect.Client, o replayOptions, outf outFormat) (fmt.Stringer, error) { +func replayOplog( + ctx context.Context, + conn connect.Client, + pbm *sdk.Client, + o replayOptions, + outf outFormat, +) (fmt.Stringer, error) { rsMap, err := parseRSNamesMapping(o.rsMap) if err != nil { return nil, errors.Wrap(err, "cannot parse replset mapping") @@ -55,8 +62,7 @@ func replayOplog(ctx context.Context, conn connect.Client, o replayOptions, outf return nil, errors.Wrap(err, "parse end time") } - err = checkConcurrentOp(ctx, conn) - if err != nil { + if err := checkForAnotherOperation(ctx, pbm); err != nil { return nil, err } diff --git a/cmd/pbm/profile.go b/cmd/pbm/profile.go index f6617cc8e..4390cc7cb 100644 --- a/cmd/pbm/profile.go +++ b/cmd/pbm/profile.go @@ -100,6 +100,9 @@ func handleAddConfigProfile( if opts.name == "" { return nil, errors.New("argument `profile-name` should not be empty") } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } _, err := pbm.GetConfig(ctx) if err != nil { @@ -174,6 +177,9 @@ func handleRemoveConfigProfile( if opts.name == "" { return nil, errors.New("argument `profile-name` should not be empty") } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } _, err := pbm.GetConfigProfile(ctx, opts.name) if err != nil { @@ -220,6 +226,10 @@ func handleSyncConfigProfile( return nil, errors.New("ambiguous: and --all are provided") } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } + var err error var cid sdk.CommandID diff --git a/cmd/pbm/restore.go b/cmd/pbm/restore.go index 3db18c5b7..3f5b6bbee 100644 --- a/cmd/pbm/restore.go +++ b/cmd/pbm/restore.go @@ -25,6 +25,7 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/storage" "github.com/percona/percona-backup-mongodb/pbm/topo" "github.com/percona/percona-backup-mongodb/pbm/util" + "github.com/percona/percona-backup-mongodb/sdk" ) type restoreOpts struct { @@ -97,7 +98,13 @@ func (r externRestoreRet) String() string { r.Name, r.Name) } -func runRestore(ctx context.Context, conn connect.Client, o *restoreOpts, outf outFormat) (fmt.Stringer, error) { +func runRestore( + ctx context.Context, + conn connect.Client, + pbm *sdk.Client, + o *restoreOpts, + outf outFormat, +) (fmt.Stringer, error) { nss, err := parseCLINSOption(o.ns) if err != nil { return nil, errors.Wrap(err, "parse --ns option") @@ -115,6 +122,10 @@ func runRestore(ctx context.Context, conn connect.Client, o *restoreOpts, outf o return nil, errors.New("either a backup name or point in time should be set, non both together!") } + if err := checkForAnotherOperation(ctx, pbm); err != nil { + return nil, err + } + clusterTime, err := topo.GetClusterTime(ctx, conn) if err != nil { return nil, errors.Wrap(err, "read cluster time") @@ -318,10 +329,6 @@ func doRestore( if err != nil { return nil, err } - err = checkConcurrentOp(ctx, conn) - if err != nil { - return nil, err - } name := time.Now().UTC().Format(time.RFC3339Nano)