Skip to content

Commit

Permalink
PBM-1208: Add active lock check before running the backup (#982)
Browse files Browse the repository at this point in the history
* Add active lock check before running the backup
  • Loading branch information
boris-ilijic authored Aug 14, 2024
1 parent 0af71e6 commit 6ba7f4a
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}

isClusterLeader := nodeInfo.IsClusterLeader()

if isClusterLeader {
moveOn, err := a.startBcpLockCheck(ctx)
if err != nil {
l.Error("start backup lock check: %v", err)
return
}
if !moveOn {
l.Error("unable to proceed with the backup, active lock is present")
return
}
}

canRunBackup, err := topo.NodeSuitsExt(ctx, a.nodeConn, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
Expand Down Expand Up @@ -314,3 +327,32 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
}
}
}

// startBcpLockCheck checks if there is any active lock.
// It fetches all existing pbm locks, and if any exists, it is also
// checked for staleness.
// false is returned in case a single active lock exists or error happens.
// true means that there's no active locks.
func (a *Agent) startBcpLockCheck(ctx context.Context) (bool, error) {
locks, err := lock.GetLocks(ctx, a.leadConn, &lock.LockHeader{})
if err != nil {
return false, errors.Wrap(err, "get all locks for backup start")
}
if len(locks) == 0 {
return true, nil
}

// stale lock check
ts, err := topo.GetClusterTime(ctx, a.leadConn)
if err != nil {
return false, errors.Wrap(err, "read cluster time")
}

for _, l := range locks {
if l.Heartbeat.T+defs.StaleFrameSec >= ts.T {
return false, nil
}
}

return true, nil
}

0 comments on commit 6ba7f4a

Please sign in to comment.