Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1400: add num-parallel-collections to pbm config #1030

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
case defs.LogicalBackup:
fallthrough
default:
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.numParallelColls)
numParallelColls := a.numParallelColls
if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 {
numParallelColls = cfg.Backup.NumParallelCollections
}
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls)
}

bcp.SetConfig(cfg)
Expand Down
8 changes: 7 additions & 1 deletion cmd/pbm-agent/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
}
}()

cfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
l.Error("get PBM config: %v", err)
return
}

l.Info("oplog replay started")
rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, 0)
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0)
err = rr.ReplayOplog(ctx, r, opID, l)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
Expand Down
10 changes: 9 additions & 1 deletion cmd/pbm-agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
r.BackupName = bcp.Name
}

cfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
l.Error("get PBM configuration: %v", err)
return
}

l.Info("recovery started")

switch bcpType {
Expand All @@ -118,9 +124,11 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
numParallelColls := runtime.NumCPU() / 2
if r.NumParallelColls != nil && *r.NumParallelColls > 0 {
numParallelColls = int(*r.NumParallelColls)
} else if cfg.Restore != nil && cfg.Restore.NumParallelCollections > 0 {
numParallelColls = cfg.Restore.NumParallelCollections
}

rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, numParallelColls)
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls)
if r.OplogTS.IsZero() {
err = rr.Snapshot(ctx, r, opid, bcp)
} else {
Expand Down
7 changes: 5 additions & 2 deletions pbm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ type RestoreConf struct {
// Logical restore
//
// num of documents to buffer
BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"`
NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"`
BatchSize int `bson:"batchSize" json:"batchSize,omitempty" yaml:"batchSize,omitempty"`
NumInsertionWorkers int `bson:"numInsertionWorkers" json:"numInsertionWorkers,omitempty" yaml:"numInsertionWorkers,omitempty"`
NumParallelCollections int `bson:"numParallelCollections" json:"numParallelCollections,omitempty" yaml:"numParallelCollections,omitempty"`

// NumDownloadWorkers sets the num of goroutine would be requesting chunks
// during the download. By default, it's set to GOMAXPROCS.
Expand Down Expand Up @@ -361,6 +362,8 @@ type BackupConf struct {
Timeouts *BackupTimeouts `bson:"timeouts,omitempty" json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
Compression compress.CompressionType `bson:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"`
CompressionLevel *int `bson:"compressionLevel,omitempty" json:"compressionLevel,omitempty" yaml:"compressionLevel,omitempty"`

NumParallelCollections int `bson:"numParallelCollections" json:"numParallelCollections,omitempty" yaml:"numParallelCollections,omitempty"`
}

func (cfg *BackupConf) Clone() *BackupConf {
Expand Down
15 changes: 7 additions & 8 deletions pbm/restore/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Restore struct {
brief topo.NodeBrief
stopHB chan struct{}
nodeInfo *topo.NodeInfo
cfg *config.Config
bcpStg storage.Storage
oplogStg storage.Storage

Expand Down Expand Up @@ -82,6 +83,7 @@ func New(
leadConn connect.Client,
nodeConn *mongo.Client,
brief topo.NodeBrief,
cfg *config.Config,
rsMap map[string]string,
numParallelColls int,
) *Restore {
Expand All @@ -95,6 +97,8 @@ func New(
brief: brief,
rsMap: rsMap,

cfg: cfg,

numParallelColls: numParallelColls,

indexCatalog: idx.NewIndexCatalog(),
Expand Down Expand Up @@ -835,7 +839,7 @@ func (r *Restore) RunSnapshot(
defer rdr.Close()

// Restore snapshot (mongorestore)
err = r.snapshot(ctx, rdr)
err = r.snapshot(rdr)
if err != nil {
return errors.Wrap(err, "mongorestore")
}
Expand Down Expand Up @@ -1188,13 +1192,8 @@ func (r *Restore) applyOplog(ctx context.Context, ranges []oplogRange, options *
return nil
}

func (r *Restore) snapshot(ctx context.Context, input io.Reader) error {
cfg, err := config.GetConfig(ctx, r.leadConn)
if err != nil {
return errors.Wrap(err, "unable to get PBM config settings")
}

rf, err := snapshot.NewRestore(r.brief.URI, cfg, r.numParallelColls)
func (r *Restore) snapshot(input io.Reader) error {
rf, err := snapshot.NewRestore(r.brief.URI, r.cfg, r.numParallelColls)
if err != nil {
return err
}
Expand Down
Loading