From 2a651f9b2b1ae67623adfc34da99d10ba21239ce Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Tue, 1 Oct 2024 17:13:03 +0200 Subject: [PATCH] PBM-1400: add num-parallel-collections to pbm config --- cmd/pbm-agent/backup.go | 6 +++++- cmd/pbm-agent/oplog.go | 8 +++++++- cmd/pbm-agent/restore.go | 10 +++++++++- pbm/config/config.go | 7 +++++-- pbm/restore/logical.go | 15 +++++++-------- 5 files changed, 33 insertions(+), 13 deletions(-) diff --git a/cmd/pbm-agent/backup.go b/cmd/pbm-agent/backup.go index 9b384e7a3..2e18eb82b 100644 --- a/cmd/pbm-agent/backup.go +++ b/cmd/pbm-agent/backup.go @@ -114,7 +114,11 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, case defs.LogicalBackup: fallthrough default: - bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.numParallelColls) + numParallelColls := a.numParallelColls + if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 { + numParallelColls = cfg.Backup.NumParallelCollections + } + bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls) } bcp.SetConfig(cfg) diff --git a/cmd/pbm-agent/oplog.go b/cmd/pbm-agent/oplog.go index 69641d442..9be619407 100644 --- a/cmd/pbm-agent/oplog.go +++ b/cmd/pbm-agent/oplog.go @@ -70,8 +70,14 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP } }() + cfg, err := config.GetConfig(ctx, a.leadConn) + if err != nil { + l.Error("get PBM config: %v", err) + return + } + l.Info("oplog replay started") - rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, 0) + rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0) err = rr.ReplayOplog(ctx, r, opID, l) if err != nil { if errors.Is(err, restore.ErrNoDataForShard) { diff --git a/cmd/pbm-agent/restore.go b/cmd/pbm-agent/restore.go index 77510b687..9a5dc02cc 100644 --- a/cmd/pbm-agent/restore.go +++ b/cmd/pbm-agent/restore.go @@ -106,6 +106,12 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, r.BackupName = bcp.Name } + cfg, err := config.GetConfig(ctx, a.leadConn) + if err != nil { + l.Error("get PBM configuration: %v", err) + return + } + l.Info("recovery started") switch bcpType { @@ -118,9 +124,11 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, numParallelColls := runtime.NumCPU() / 2 if r.NumParallelColls != nil && *r.NumParallelColls > 0 { numParallelColls = int(*r.NumParallelColls) + } else if cfg.Restore != nil && cfg.Restore.NumParallelCollections > 0 { + numParallelColls = cfg.Restore.NumParallelCollections } - rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, numParallelColls) + rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls) if r.OplogTS.IsZero() { err = rr.Snapshot(ctx, r, opid, bcp) } else { diff --git a/pbm/config/config.go b/pbm/config/config.go index 180e204de..3cb985acf 100644 --- a/pbm/config/config.go +++ b/pbm/config/config.go @@ -321,8 +321,9 @@ type RestoreConf struct { // Logical restore // // num of documents to buffer - BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"` - NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"` + BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"` + NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"` + NumParallelCollections int `bson:"numParallelCollections" json:"numParallelCollections,omitempty" yaml:"numParallelCollections,omitempty"` // NumDownloadWorkers sets the num of goroutine would be requesting chunks // during the download. By default, it's set to GOMAXPROCS. @@ -361,6 +362,8 @@ type BackupConf struct { Timeouts *BackupTimeouts `bson:"timeouts,omitempty" json:"timeouts,omitempty" yaml:"timeouts,omitempty"` Compression compress.CompressionType `bson:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"` CompressionLevel *int `bson:"compressionLevel,omitempty" json:"compressionLevel,omitempty" yaml:"compressionLevel,omitempty"` + + NumParallelCollections int `bson:"numParallelCollections" json:"numParallelCollections,omitempty" yaml:"numParallelCollections,omitempty"` } func (cfg *BackupConf) Clone() *BackupConf { diff --git a/pbm/restore/logical.go b/pbm/restore/logical.go index 94901d446..cd455a5c6 100644 --- a/pbm/restore/logical.go +++ b/pbm/restore/logical.go @@ -44,6 +44,7 @@ type Restore struct { brief topo.NodeBrief stopHB chan struct{} nodeInfo *topo.NodeInfo + cfg *config.Config bcpStg storage.Storage oplogStg storage.Storage @@ -82,6 +83,7 @@ func New( leadConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief, + cfg *config.Config, rsMap map[string]string, numParallelColls int, ) *Restore { @@ -95,6 +97,8 @@ func New( brief: brief, rsMap: rsMap, + cfg: cfg, + numParallelColls: numParallelColls, indexCatalog: idx.NewIndexCatalog(), @@ -835,7 +839,7 @@ func (r *Restore) RunSnapshot( defer rdr.Close() // Restore snapshot (mongorestore) - err = r.snapshot(ctx, rdr) + err = r.snapshot(rdr) if err != nil { return errors.Wrap(err, "mongorestore") } @@ -1188,13 +1192,8 @@ func (r *Restore) applyOplog(ctx context.Context, ranges []oplogRange, options * return nil } -func (r *Restore) snapshot(ctx context.Context, input io.Reader) error { - cfg, err := config.GetConfig(ctx, r.leadConn) - if err != nil { - return errors.Wrap(err, "unable to get PBM config settings") - } - - rf, err := snapshot.NewRestore(r.brief.URI, cfg, r.numParallelColls) +func (r *Restore) snapshot(input io.Reader) error { + rf, err := snapshot.NewRestore(r.brief.URI, r.cfg, r.numParallelColls) if err != nil { return err }