Skip to content

Commit

Permalink
[PBM-1316] add check for another op in CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Aug 28, 2024
1 parent 947d1f5 commit 57bc962
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 74 deletions.
11 changes: 2 additions & 9 deletions cmd/pbm/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,8 @@ func runBackup(
return nil, errors.Wrap(err, "backup pre-check")
}

if err := checkConcurrentOp(ctx, conn); err != nil {
// PITR slicing can be run along with the backup start - agents will resolve it.
var e *concurentOpError
if !errors.As(err, &e) {
return nil, err
}
if e.op.Type != ctrl.CmdPITR {
return nil, err
}
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

cfg, err := config.GetProfiledConfig(ctx, conn, b.profile)
Expand Down
48 changes: 48 additions & 0 deletions cmd/pbm/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package main

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/percona/percona-backup-mongodb/pbm/connect"
"github.com/percona/percona-backup-mongodb/pbm/ctrl"
"github.com/percona/percona-backup-mongodb/pbm/defs"
"github.com/percona/percona-backup-mongodb/pbm/errors"
"github.com/percona/percona-backup-mongodb/sdk"
)

var errWaitTimeout = errors.New("Operation is in progress. Check pbm status and logs")
Expand All @@ -16,3 +20,47 @@ func sendCmd(ctx context.Context, conn connect.Client, cmd ctrl.Cmd) error {
_, err := conn.CmdStreamCollection().InsertOne(ctx, cmd)
return err
}

func checkForAnotherOperation(ctx context.Context, pbm *sdk.Client) error {
locks, err := pbm.OpLocks(ctx)
if err != nil {
return errors.Wrap(err, "get operation lock")
}
if len(locks) == 0 {
return nil
}

ts, err := sdk.ClusterTime(ctx, pbm)
if err != nil {
return errors.Wrap(err, "get cluster time")
}

for _, l := range locks {
if l.Heartbeat.T+defs.StaleFrameSec >= ts.T {
return &concurentOpError{l}
}
}

return nil
}

type concurentOpError struct{ sdk.OpLock }

func (e *concurentOpError) Error() string {
return fmt.Sprintf("another operation in progress, %s/%s [%s/%s]",
e.Cmd, e.OpID, e.Replset, e.Node)
}

func (e *concurentOpError) MarshalJSON() ([]byte, error) {
s := map[string]any{
"error": "another operation in progress",
"operation": map[string]any{
"type": e.Cmd,
"opid": e.OpID,
"replset": e.Replset,
"node": e.Node,
},
}

return json.Marshal(s)
}
13 changes: 12 additions & 1 deletion cmd/pbm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,18 @@ func (c confVals) String() string {
return s
}

func runConfig(ctx context.Context, conn connect.Client, pbm *sdk.Client, c *configOpts) (fmt.Stringer, error) {
func runConfig(
ctx context.Context,
conn connect.Client,
pbm *sdk.Client,
c *configOpts,
) (fmt.Stringer, error) {
if len(c.set) != 0 || c.rsync || c.file != "" {
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}
}

switch {
case len(c.set) > 0:
var o confVals
Expand Down
9 changes: 9 additions & 0 deletions cmd/pbm/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func deleteBackup(
if d.bcpType != "" && d.olderThan == "" {
return nil, errors.New("cannot use --type without --older-then")
}
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

var cid sdk.CommandID
var err error
Expand Down Expand Up @@ -170,6 +173,9 @@ func deletePITR(
if d.olderThan != "" && d.all {
return nil, errors.New("cannot use --older-then and --all at the same command")
}
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

var until primitive.Timestamp
if d.all {
Expand Down Expand Up @@ -255,6 +261,9 @@ func doCleanup(ctx context.Context, conn connect.Client, pbm *sdk.Client, d *cle
realTime := n.Format(time.RFC3339)
return nil, errors.Errorf("--older-than %q is after now %q", providedTime, realTime)
}
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

info, err := pbm.CleanupReport(ctx, ts)
if err != nil {
Expand Down
58 changes: 2 additions & 56 deletions cmd/pbm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/connect"
"github.com/percona/percona-backup-mongodb/pbm/defs"
"github.com/percona/percona-backup-mongodb/pbm/errors"
"github.com/percona/percona-backup-mongodb/pbm/lock"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/oplog"
"github.com/percona/percona-backup-mongodb/pbm/topo"
"github.com/percona/percona-backup-mongodb/pbm/version"
"github.com/percona/percona-backup-mongodb/sdk"
)
Expand Down Expand Up @@ -516,9 +514,9 @@ func main() {
case descBcpCmd.FullCommand():
out, err = describeBackup(ctx, pbm, &descBcp)
case restoreCmd.FullCommand():
out, err = runRestore(ctx, conn, &restore, pbmOutF)
out, err = runRestore(ctx, conn, pbm, &restore, pbmOutF)
case replayCmd.FullCommand():
out, err = replayOplog(ctx, conn, replayOpts, pbmOutF)
out, err = replayOplog(ctx, conn, pbm, replayOpts, pbmOutF)
case listCmd.FullCommand():
out, err = runList(ctx, conn, pbm, &list)
case deleteBcpCmd.FullCommand():
Expand Down Expand Up @@ -775,55 +773,3 @@ func parseDateT(v string) (time.Time, error) {

return time.Time{}, errInvalidFormat
}

type concurentOpError struct {
op *lock.LockHeader
}

func (e *concurentOpError) Error() string {
return fmt.Sprintf("another operation in progress, %s/%s [%s/%s]", e.op.Type, e.op.OPID, e.op.Replset, e.op.Node)
}

func (e *concurentOpError) As(err any) bool {
if err == nil {
return false
}

er, ok := err.(*concurentOpError)
if !ok {
return false
}

er.op = e.op
return true
}

func (e *concurentOpError) MarshalJSON() ([]byte, error) {
s := make(map[string]interface{})
s["error"] = "another operation in progress"
s["operation"] = e.op
return json.Marshal(s)
}

func checkConcurrentOp(ctx context.Context, conn connect.Client) error {
locks, err := lock.GetLocks(ctx, conn, &lock.LockHeader{})
if err != nil {
return errors.Wrap(err, "get locks")
}

ts, err := topo.GetClusterTime(ctx, conn)
if err != nil {
return errors.Wrap(err, "read cluster time")
}

// Stop if there is some live operation.
// But in case of stale lock just move on
// and leave it for agents to deal with.
for _, l := range locks {
if l.Heartbeat.T+defs.StaleFrameSec >= ts.T {
return &concurentOpError{&l.LockHeader}
}
}

return nil
}
12 changes: 9 additions & 3 deletions cmd/pbm/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/defs"
"github.com/percona/percona-backup-mongodb/pbm/errors"
"github.com/percona/percona-backup-mongodb/pbm/restore"
"github.com/percona/percona-backup-mongodb/sdk"
)

type replayOptions struct {
Expand Down Expand Up @@ -40,7 +41,13 @@ func (r oplogReplayResult) String() string {
return fmt.Sprintf("Oplog replay %q has started", r.Name)
}

func replayOplog(ctx context.Context, conn connect.Client, o replayOptions, outf outFormat) (fmt.Stringer, error) {
func replayOplog(
ctx context.Context,
conn connect.Client,
pbm *sdk.Client,
o replayOptions,
outf outFormat,
) (fmt.Stringer, error) {
rsMap, err := parseRSNamesMapping(o.rsMap)
if err != nil {
return nil, errors.Wrap(err, "cannot parse replset mapping")
Expand All @@ -55,8 +62,7 @@ func replayOplog(ctx context.Context, conn connect.Client, o replayOptions, outf
return nil, errors.Wrap(err, "parse end time")
}

err = checkConcurrentOp(ctx, conn)
if err != nil {
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/pbm/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func handleAddConfigProfile(
if opts.name == "" {
return nil, errors.New("argument `profile-name` should not be empty")
}
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

_, err := pbm.GetConfig(ctx)
if err != nil {
Expand Down Expand Up @@ -174,6 +177,9 @@ func handleRemoveConfigProfile(
if opts.name == "" {
return nil, errors.New("argument `profile-name` should not be empty")
}
if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

_, err := pbm.GetConfigProfile(ctx, opts.name)
if err != nil {
Expand Down Expand Up @@ -220,6 +226,10 @@ func handleSyncConfigProfile(
return nil, errors.New("ambiguous: <profile-name> and --all are provided")
}

if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

var err error
var cid sdk.CommandID

Expand Down
17 changes: 12 additions & 5 deletions cmd/pbm/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/storage"
"github.com/percona/percona-backup-mongodb/pbm/topo"
"github.com/percona/percona-backup-mongodb/pbm/util"
"github.com/percona/percona-backup-mongodb/sdk"
)

type restoreOpts struct {
Expand Down Expand Up @@ -97,7 +98,13 @@ func (r externRestoreRet) String() string {
r.Name, r.Name)
}

func runRestore(ctx context.Context, conn connect.Client, o *restoreOpts, outf outFormat) (fmt.Stringer, error) {
func runRestore(
ctx context.Context,
conn connect.Client,
pbm *sdk.Client,
o *restoreOpts,
outf outFormat,
) (fmt.Stringer, error) {
nss, err := parseCLINSOption(o.ns)
if err != nil {
return nil, errors.Wrap(err, "parse --ns option")
Expand All @@ -115,6 +122,10 @@ func runRestore(ctx context.Context, conn connect.Client, o *restoreOpts, outf o
return nil, errors.New("either a backup name or point in time should be set, non both together!")
}

if err := checkForAnotherOperation(ctx, pbm); err != nil {
return nil, err
}

clusterTime, err := topo.GetClusterTime(ctx, conn)
if err != nil {
return nil, errors.Wrap(err, "read cluster time")
Expand Down Expand Up @@ -318,10 +329,6 @@ func doRestore(
if err != nil {
return nil, err
}
err = checkConcurrentOp(ctx, conn)
if err != nil {
return nil, err
}

name := time.Now().UTC().Format(time.RFC3339Nano)

Expand Down

0 comments on commit 57bc962

Please sign in to comment.