diff --git a/cmd/pbm-agent/agent.go b/cmd/pbm-agent/agent.go index bbddc64f8..4cfed0747 100644 --- a/cmd/pbm-agent/agent.go +++ b/cmd/pbm-agent/agent.go @@ -34,7 +34,7 @@ type Agent struct { brief topo.NodeBrief - dumpConns int + numParallelColls int closeCMD chan struct{} pauseHB int32 @@ -44,7 +44,12 @@ type Agent struct { monStopSig chan struct{} } -func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConns int) (*Agent, error) { +func newAgent( + ctx context.Context, + leadConn connect.Client, + uri string, + numParallelColls int, +) (*Agent, error) { nodeConn, err := connect.MongoConnect(ctx, uri, connect.Direct(true)) if err != nil { return nil, err @@ -72,7 +77,7 @@ func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConn ConfigSvr: info.IsConfigSrv(), Version: mongoVersion, }, - dumpConns: dumpConns, + numParallelColls: numParallelColls, } return a, nil } diff --git a/cmd/pbm-agent/backup.go b/cmd/pbm-agent/backup.go index e685ad466..9b384e7a3 100644 --- a/cmd/pbm-agent/backup.go +++ b/cmd/pbm-agent/backup.go @@ -114,7 +114,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, case defs.LogicalBackup: fallthrough default: - bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.dumpConns) + bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.numParallelColls) } bcp.SetConfig(cfg) diff --git a/cmd/pbm-agent/main.go b/cmd/pbm-agent/main.go index 648060098..34239775f 100644 --- a/cmd/pbm-agent/main.go +++ b/cmd/pbm-agent/main.go @@ -32,7 +32,8 @@ func main() { Envar("PBM_MONGODB_URI"). Required(). String() - dumpConns = pbmAgentCmd.Flag("dump-parallel-collections", "Number of collections to dump in parallel"). + dumpConns = pbmAgentCmd. + Flag("dump-parallel-collections", "Number of collections to dump in parallel"). Envar("PBM_DUMP_PARALLEL_COLLECTIONS"). Default(strconv.Itoa(runtime.NumCPU() / 2)). Int() diff --git a/cmd/pbm-agent/oplog.go b/cmd/pbm-agent/oplog.go index cb1bb1d03..69641d442 100644 --- a/cmd/pbm-agent/oplog.go +++ b/cmd/pbm-agent/oplog.go @@ -71,7 +71,9 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP }() l.Info("oplog replay started") - if err := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).ReplayOplog(ctx, r, opID, l); err != nil { + rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, 0) + err = rr.ReplayOplog(ctx, r, opID, l) + if err != nil { if errors.Is(err, restore.ErrNoDataForShard) { l.Info("no oplog for the shard, skipping") } else { diff --git a/cmd/pbm-agent/restore.go b/cmd/pbm-agent/restore.go index 872681265..1d76153a0 100644 --- a/cmd/pbm-agent/restore.go +++ b/cmd/pbm-agent/restore.go @@ -113,10 +113,17 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, l.Info("This node is not the primary. Check pbm agent on the primary for restore progress") return } + + var numParallelColls int + if r.NumParallelColls != nil && *r.NumParallelColls > 0 { + numParallelColls = int(*r.NumParallelColls) + } + + rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, numParallelColls) if r.OplogTS.IsZero() { - err = restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).Snapshot(ctx, r, opid, bcp) + err = rr.Snapshot(ctx, r, opid, bcp) } else { - err = restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).PITR(ctx, r, opid, bcp) + err = rr.PITR(ctx, r, opid, bcp) } case defs.PhysicalBackup, defs.IncrementalBackup, defs.ExternalBackup: if lck != nil { diff --git a/cmd/pbm/backup.go b/cmd/pbm/backup.go index 714f7f78e..814fa83ab 100644 --- a/cmd/pbm/backup.go +++ b/cmd/pbm/backup.go @@ -38,6 +38,8 @@ type backupOpts struct { wait bool waitTime time.Duration externList bool + + numParallelColls int32 } type backupOut struct { @@ -87,6 +89,10 @@ func runBackup( b *backupOpts, outf outFormat, ) (fmt.Stringer, error) { + numParallelColls, err := parseCLINumParallelCollsOption(b.numParallelColls) + if err != nil { + return nil, errors.Wrap(err, "parse --num-parallel-collections option") + } nss, err := parseCLINSOption(b.ns) if err != nil { return nil, errors.Wrap(err, "parse --ns option") @@ -136,6 +142,7 @@ func runBackup( Namespaces: nss, Compression: compression, CompressionLevel: level, + NumParallelColls: numParallelColls, Filelist: b.externList, Profile: b.profile, }, @@ -662,3 +669,14 @@ func (incompatibleMongodVersionError) Is(err error) bool { func (e incompatibleMongodVersionError) Unwrap() error { return errIncompatible } + +func parseCLINumParallelCollsOption(value int32) (*int32, error) { + if value < 0 { + return nil, errors.New("value cannot be negative") + } + if value == 0 { + return nil, nil //nolint:nilnil + } + + return &value, nil +} diff --git a/cmd/pbm/main.go b/cmd/pbm/main.go index c7819eafa..ceec59a2e 100644 --- a/cmd/pbm/main.go +++ b/cmd/pbm/main.go @@ -196,6 +196,8 @@ func main() { backupCmd.Flag("profile", "Config profile name").StringVar(&backupOptions.profile) backupCmd.Flag("compression-level", "Compression level (specific to the compression type)"). IntsVar(&backupOptions.compressionLevel) + backupCmd.Flag("num-parallel-collections", "Number of parallel collections"). + Int32Var(&backupOptions.numParallelColls) backupCmd.Flag("ns", `Namespaces to backup (e.g. "db.*", "db.collection"). If not set, backup all ("*.*")`). StringVar(&backupOptions.ns) backupCmd.Flag("wait", "Wait for the backup to finish"). @@ -239,6 +241,8 @@ func main() { restoreCmd.Flag("base-snapshot", "Override setting: Name of older snapshot that PITR will be based on during restore."). StringVar(&restore.pitrBase) + restoreCmd.Flag("num-parallel-collections", "Number of parallel collections"). + Int32Var(&restore.numParallelColls) restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`). StringVar(&restore.ns) restoreCmd.Flag("with-users-and-roles", "Includes users and roles for selected database (--ns flag)"). diff --git a/cmd/pbm/restore.go b/cmd/pbm/restore.go index 3adc1b05d..9c8030f7d 100644 --- a/cmd/pbm/restore.go +++ b/cmd/pbm/restore.go @@ -40,6 +40,8 @@ type restoreOpts struct { rsMap string conf string ts string + + numParallelColls int32 } type restoreRet struct { @@ -105,6 +107,10 @@ func runRestore( o *restoreOpts, outf outFormat, ) (fmt.Stringer, error) { + numParallelColls, err := parseCLINumParallelCollsOption(o.numParallelColls) + if err != nil { + return nil, errors.Wrap(err, "parse --num-parallel-collections option") + } nss, err := parseCLINSOption(o.ns) if err != nil { return nil, errors.Wrap(err, "parse --ns option") @@ -132,7 +138,7 @@ func runRestore( } tdiff := time.Now().Unix() - int64(clusterTime.T) - m, err := doRestore(ctx, conn, o, nss, rsMap, outf) + m, err := doRestore(ctx, conn, o, numParallelColls, nss, rsMap, outf) if err != nil { return nil, err } @@ -321,6 +327,7 @@ func doRestore( ctx context.Context, conn connect.Client, o *restoreOpts, + numParallelColls *int32, nss []string, rsMapping map[string]string, outf outFormat, @@ -335,12 +342,13 @@ func doRestore( cmd := ctrl.Cmd{ Cmd: ctrl.CmdRestore, Restore: &ctrl.RestoreCmd{ - Name: name, - BackupName: bcp, - Namespaces: nss, - UsersAndRoles: o.usersAndRoles, - RSMap: rsMapping, - External: o.extern, + Name: name, + BackupName: bcp, + NumParallelColls: numParallelColls, + Namespaces: nss, + UsersAndRoles: o.usersAndRoles, + RSMap: rsMapping, + External: o.extern, }, } if o.pitr != "" { diff --git a/pbm/archive/archive.go b/pbm/archive/archive.go index 3556e34d1..5fa7fdcec 100644 --- a/pbm/archive/archive.go +++ b/pbm/archive/archive.go @@ -82,12 +82,18 @@ func Decompose(r io.Reader, newWriter NewWriter, nsFilter NSFilterFn, docFilter return errors.Wrap(err, "metadata") } -func Compose(w io.Writer, nsFilter NSFilterFn, newReader NewReader) error { +func Compose(w io.Writer, newReader NewReader, nsFilter NSFilterFn, concurrency int) error { meta, err := readMetadata(newReader) if err != nil { return errors.Wrap(err, "metadata") } + if concurrency > 0 { + // mongorestore uses this field as a number of + // concurrent collections to restore at a moment + meta.Header.ConcurrentCollections = int32(concurrency) + } + nss := make([]*Namespace, 0, len(meta.Namespaces)) for _, ns := range meta.Namespaces { if nsFilter(NSify(ns.Database, ns.Collection)) { diff --git a/pbm/backup/backup.go b/pbm/backup/backup.go index e366a34ec..8f20a5970 100644 --- a/pbm/backup/backup.go +++ b/pbm/backup/backup.go @@ -32,17 +32,17 @@ type Backup struct { typ defs.BackupType incrBase bool timeouts *config.BackupTimeouts - dumpConns int + numParallelColls int oplogSlicerInterval time.Duration } func New(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, dumpConns int) *Backup { return &Backup{ - leadConn: leadConn, - nodeConn: conn, - brief: brief, - typ: defs.LogicalBackup, - dumpConns: dumpConns, + leadConn: leadConn, + nodeConn: conn, + brief: brief, + typ: defs.LogicalBackup, + numParallelColls: dumpConns, } } diff --git a/pbm/backup/logical.go b/pbm/backup/logical.go index ab0cb512f..aad0ff1ce 100644 --- a/pbm/backup/logical.go +++ b/pbm/backup/logical.go @@ -135,7 +135,17 @@ func (b *Backup) doLogical( if len(nssSize) == 0 { dump = snapshot.DummyBackup{} } else { - dump, err = snapshot.NewBackup(b.brief.URI, b.dumpConns, db, coll) + numParallelColls := b.numParallelColls + if bcp.NumParallelColls != nil { + if *bcp.NumParallelColls > 0 { + numParallelColls = int(*bcp.NumParallelColls) + } else { + l.Warning("invalid value of NumParallelCollections (%v). fallback to %v", + numParallelColls, b.numParallelColls) + } + } + + dump, err = snapshot.NewBackup(b.brief.URI, numParallelColls, db, coll) if err != nil { return errors.Wrap(err, "init mongodump options") } diff --git a/pbm/ctrl/cmd.go b/pbm/ctrl/cmd.go index 23542ed0e..d43e7f51f 100644 --- a/pbm/ctrl/cmd.go +++ b/pbm/ctrl/cmd.go @@ -133,6 +133,7 @@ type BackupCmd struct { Namespaces []string `bson:"nss,omitempty"` Compression compress.CompressionType `bson:"compression"` CompressionLevel *int `bson:"level,omitempty"` + NumParallelColls *int32 `bson:"numParallelColls,omitempty"` Filelist bool `bson:"filelist,omitempty"` Profile string `bson:"profile,omitempty"` } @@ -154,6 +155,8 @@ type RestoreCmd struct { UsersAndRoles bool `bson:"usersAndRoles,omitempty"` RSMap map[string]string `bson:"rsMap,omitempty"` + NumParallelColls *int32 `bson:"numParallelColls,omitempty"` + OplogTS primitive.Timestamp `bson:"oplogTS,omitempty"` External bool `bson:"external"` diff --git a/pbm/restore/logical.go b/pbm/restore/logical.go index f46b89ea2..db9c6ea35 100644 --- a/pbm/restore/logical.go +++ b/pbm/restore/logical.go @@ -46,6 +46,8 @@ type Restore struct { nodeInfo *topo.NodeInfo bcpStg storage.Storage oplogStg storage.Storage + + numParallelColls int // Shards to participate in restore. Num of shards in bcp could // be less than in the cluster and this is ok. Only these shards // would be expected to run restore (distributed transactions sync, @@ -76,7 +78,13 @@ type oplogRange struct { type restoreUsersAndRolesOption bool // New creates a new restore object -func New(leadConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief, rsMap map[string]string) *Restore { +func New( + leadConn connect.Client, + nodeConn *mongo.Client, + brief topo.NodeBrief, + rsMap map[string]string, + numParallelColls int, +) *Restore { if rsMap == nil { rsMap = make(map[string]string) } @@ -87,6 +95,8 @@ func New(leadConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief, brief: brief, rsMap: rsMap, + numParallelColls: numParallelColls, + indexCatalog: idx.NewIndexCatalog(), } } @@ -805,7 +815,8 @@ func (r *Restore) RunSnapshot( return rdr, nil }, bcp.Compression, - util.MakeSelectedPred(nss)) + util.MakeSelectedPred(nss), + r.numParallelColls) } if err != nil { return err diff --git a/pbm/snapshot/backup.go b/pbm/snapshot/backup.go index b0cd2cd7e..fc816da39 100644 --- a/pbm/snapshot/backup.go +++ b/pbm/snapshot/backup.go @@ -20,11 +20,7 @@ type backuper struct { pm *progress.BarWriter } -func NewBackup(curi string, conns int, d, c string) (*backuper, error) { - if conns <= 0 { - conns = 1 - } - +func NewBackup(curi string, maxParallelColls int, d, c string) (*backuper, error) { var err error opts := options.New("pbm-agent:dump", version.Current().Version, "", "", false, @@ -49,6 +45,10 @@ func NewBackup(curi string, conns int, d, c string) (*backuper, error) { } } + if maxParallelColls < 1 { + maxParallelColls = 1 + } + backup := &backuper{} backup.pm = progress.NewBarWriter(&progressWriter{}, time.Second*60, 24, false) @@ -59,7 +59,7 @@ func NewBackup(curi string, conns int, d, c string) (*backuper, error) { // instead of creating a file. This is not clear at plain sight, // you nee to look the code to discover it. Archive: "-", - NumParallelCollections: conns, + NumParallelCollections: maxParallelColls, }, InputOptions: &mongodump.InputOptions{}, SessionProvider: &db.SessionProvider{}, diff --git a/pbm/snapshot/dump.go b/pbm/snapshot/dump.go index 3abdeaf31..0cc3ecedc 100644 --- a/pbm/snapshot/dump.go +++ b/pbm/snapshot/dump.go @@ -98,6 +98,7 @@ func DownloadDump( download DownloadFunc, compression compress.CompressionType, match archive.NSFilterFn, + numParallelColls int, ) (io.ReadCloser, error) { pr, pw := io.Pipe() @@ -120,7 +121,7 @@ func DownloadDump( return r, errors.Wrapf(err, "create decompressor: %q", ns) } - err := archive.Compose(pw, match, newReader) + err := archive.Compose(pw, newReader, match, numParallelColls) pw.CloseWithError(errors.Wrap(err, "compose")) }() diff --git a/pbm/snapshot/restore.go b/pbm/snapshot/restore.go index b4f56c589..cdccfc26a 100644 --- a/pbm/snapshot/restore.go +++ b/pbm/snapshot/restore.go @@ -87,7 +87,6 @@ func NewRestore(uri string, cfg *config.Config) (io.ReaderFrom, error) { BypassDocumentValidation: true, Drop: true, NumInsertionWorkers: numInsertionWorkers, - NumParallelCollections: 1, PreserveUUID: preserveUUID, StopOnError: true, WriteConcern: "majority",