Skip to content

Commit

Permalink
Merge branch 'dev' into PBM-1224
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Aug 7, 2024
2 parents 6c61d4f + 708b368 commit ae8cb92
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 12 deletions.
6 changes: 5 additions & 1 deletion pbm/log/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (e *Entries) SetLocation(l string) error {
}

func (e Entries) MarshalJSON() ([]byte, error) {
return json.Marshal(e.Data)
data := e.Data
if data == nil {
data = []Entry{}
}
return json.Marshal(data)
}

func (e Entries) String() string {
Expand Down
10 changes: 4 additions & 6 deletions pbm/prio/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
)

const (
defaultScore = 1.0
scoreForPrimary = defaultScore / 2
scoreForSecondary = defaultScore * 1
scoreForHidden = defaultScore * 2
defaultScore = 1.0
scoreForPrimary = defaultScore / 2
scoreForHidden = defaultScore * 2
)

// NodesPriority groups nodes by priority according to
Expand Down Expand Up @@ -108,11 +107,10 @@ func CalcPriorityForAgent(
func CalcPriorityForNode(node *topo.NodeInfo) float64 {
if node.IsPrimary {
return scoreForPrimary
} else if node.Secondary {
return scoreForSecondary
} else if node.Hidden {
return scoreForHidden
}

return defaultScore
}

Expand Down
36 changes: 36 additions & 0 deletions pbm/prio/priority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,42 @@ func TestCalcNodesPriority(t *testing.T) {
})
}

func TestCalcPriorityForNode(t *testing.T) {
t.Run("for primary", func(t *testing.T) {
nodeInfo := &topo.NodeInfo{
IsPrimary: true,
}

p := CalcPriorityForNode(nodeInfo)
if p != scoreForPrimary {
t.Errorf("wrong priority for primary: want=%v, got=%v", scoreForPrimary, p)
}
})

t.Run("for secondary", func(t *testing.T) {
nodeInfo := &topo.NodeInfo{
Secondary: true,
}

p := CalcPriorityForNode(nodeInfo)
if p != defaultScore {
t.Errorf("wrong priority for secondary: want=%v, got=%v", defaultScore, p)
}
})

t.Run("for hidden", func(t *testing.T) {
nodeInfo := &topo.NodeInfo{
Hidden: true,
Secondary: true, // hidden is also secondary
}

p := CalcPriorityForNode(nodeInfo)
if p != scoreForHidden {
t.Errorf("wrong priority for hidden: want=%v, got=%v", scoreForHidden, p)
}
})
}

func newP(rs, node string) topo.AgentStat {
return newAgent(rs, node, defs.NodeStatePrimary, false)
}
Expand Down
70 changes: 65 additions & 5 deletions pbm/restore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (

tryConnCount = 5
tryConnTimeout = 5 * time.Minute

maxShutdownTriesOnStandaloneRecovery = 10
)

type files struct {
Expand Down Expand Up @@ -1259,7 +1261,17 @@ func (r *PhysRestore) prepareData() error {
}

func shutdown(c *mongo.Client, dbpath string) error {
err := c.Database("admin").RunCommand(context.TODO(), bson.D{{"shutdown", 1}}).Err()
return shutdownImpl(c, dbpath, false)
}

func forceShutdown(c *mongo.Client, dbpath string) error {
return shutdownImpl(c, dbpath, true)
}

func shutdownImpl(c *mongo.Client, dbpath string, force bool) error {
res := c.Database("admin").RunCommand(context.TODO(),
bson.D{{"shutdown", 1}, {"force", force}})
err := res.Err()
if err != nil && !strings.Contains(err.Error(), "socket was unexpectedly closed") {
return err
}
Expand All @@ -1285,12 +1297,24 @@ func (r *PhysRestore) recoverStandalone() error {
return errors.Wrap(err, "connect to mongo")
}

err = shutdown(c, r.dbpath)
if err != nil {
return errors.Wrap(err, "shutdown mongo")
for i := 0; i != maxShutdownTriesOnStandaloneRecovery; i++ {
err = shutdown(c, r.dbpath)
if err == nil {
return nil // OK
}

if strings.Contains(err.Error(), "ConflictingOperationInProgress") {
r.log.Warning("retry shutdown in 5 seconds. reason: %v", err)
time.Sleep(5 * time.Second)
continue
}

return errors.Wrap(err, "shutdown mongo") // unexpected
}

return nil
r.log.Debug("force shutdown")
err = forceShutdown(c, r.dbpath)
return errors.Wrap(err, "force shutdown mongo")
}

func (r *PhysRestore) replayOplog(
Expand Down Expand Up @@ -1537,6 +1561,8 @@ func (r *PhysRestore) resetRS() error {
if err != nil {
return errors.Wrap(err, "turn off pitr")
}

r.dropPBMCollections(ctx, c)
}

err = shutdown(c, r.dbpath)
Expand All @@ -1547,6 +1573,40 @@ func (r *PhysRestore) resetRS() error {
return nil
}

func (r *PhysRestore) dropPBMCollections(ctx context.Context, c *mongo.Client) {
pbmCollections := []string{
defs.LockCollection,
defs.LogCollection,
// defs.ConfigCollection,
defs.LockCollection,
defs.LockOpCollection,
defs.BcpCollection,
defs.RestoresCollection,
defs.CmdStreamCollection,
defs.PITRChunksCollection,
defs.PITRCollection,
defs.PBMOpLogCollection,
defs.AgentsStatusCollection,
}

wg := &sync.WaitGroup{}
wg.Add(len(pbmCollections))

for _, coll := range pbmCollections {
go func() {
defer wg.Done()

r.log.Debug("dropping 'admin.%s'", coll)
err := c.Database(defs.DB).Collection(coll).Drop(ctx)
if err != nil {
r.log.Warning("failed to drop 'admin.%s': %v", coll, err)
}
}()
}

wg.Wait()
}

func (r *PhysRestore) getShardMapping(bcp *backup.BackupMeta) map[string]string {
source := make(map[string]string)
if bcp != nil && bcp.ShardRemap != nil {
Expand Down

0 comments on commit ae8cb92

Please sign in to comment.