Skip to content

Commit

Permalink
[PBM-1312] allow to control max number of parallel collections
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Sep 9, 2024
1 parent e36eb5f commit 6edc992
Show file tree
Hide file tree
Showing 16 changed files with 108 additions and 33 deletions.
11 changes: 8 additions & 3 deletions cmd/pbm-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Agent struct {

brief topo.NodeBrief

dumpConns int
numParallelColls int

closeCMD chan struct{}
pauseHB int32
Expand All @@ -44,7 +44,12 @@ type Agent struct {
monStopSig chan struct{}
}

func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConns int) (*Agent, error) {
func newAgent(
ctx context.Context,
leadConn connect.Client,
uri string,
numParallelColls int,
) (*Agent, error) {
nodeConn, err := connect.MongoConnect(ctx, uri, connect.Direct(true))
if err != nil {
return nil, err
Expand Down Expand Up @@ -72,7 +77,7 @@ func newAgent(ctx context.Context, leadConn connect.Client, uri string, dumpConn
ConfigSvr: info.IsConfigSrv(),
Version: mongoVersion,
},
dumpConns: dumpConns,
numParallelColls: numParallelColls,
}
return a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
case defs.LogicalBackup:
fallthrough
default:
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.dumpConns)
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.numParallelColls)
}

bcp.SetConfig(cfg)
Expand Down
3 changes: 2 additions & 1 deletion cmd/pbm-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func main() {
Envar("PBM_MONGODB_URI").
Required().
String()
dumpConns = pbmAgentCmd.Flag("dump-parallel-collections", "Number of collections to dump in parallel").
dumpConns = pbmAgentCmd.
Flag("dump-parallel-collections", "Number of collections to dump in parallel").
Envar("PBM_DUMP_PARALLEL_COLLECTIONS").
Default(strconv.Itoa(runtime.NumCPU() / 2)).
Int()
Expand Down
4 changes: 3 additions & 1 deletion cmd/pbm-agent/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
}()

l.Info("oplog replay started")
if err := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).ReplayOplog(ctx, r, opID, l); err != nil {
rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, 0)
err = rr.ReplayOplog(ctx, r, opID, l)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no oplog for the shard, skipping")
} else {
Expand Down
11 changes: 9 additions & 2 deletions cmd/pbm-agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,17 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
l.Info("This node is not the primary. Check pbm agent on the primary for restore progress")
return
}

var numParallelColls int
if r.NumParallelColls != nil && *r.NumParallelColls > 0 {
numParallelColls = int(*r.NumParallelColls)
}

rr := restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap, numParallelColls)
if r.OplogTS.IsZero() {
err = restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).Snapshot(ctx, r, opid, bcp)
err = rr.Snapshot(ctx, r, opid, bcp)
} else {
err = restore.New(a.leadConn, a.nodeConn, a.brief, r.RSMap).PITR(ctx, r, opid, bcp)
err = rr.PITR(ctx, r, opid, bcp)
}
case defs.PhysicalBackup, defs.IncrementalBackup, defs.ExternalBackup:
if lck != nil {
Expand Down
18 changes: 18 additions & 0 deletions cmd/pbm/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type backupOpts struct {
wait bool
waitTime time.Duration
externList bool

numParallelColls int32
}

type backupOut struct {
Expand Down Expand Up @@ -87,6 +89,10 @@ func runBackup(
b *backupOpts,
outf outFormat,
) (fmt.Stringer, error) {
numParallelColls, err := parseCLINumParallelCollsOption(b.numParallelColls)
if err != nil {
return nil, errors.Wrap(err, "parse --num-parallel-collections option")
}
nss, err := parseCLINSOption(b.ns)
if err != nil {
return nil, errors.Wrap(err, "parse --ns option")
Expand Down Expand Up @@ -136,6 +142,7 @@ func runBackup(
Namespaces: nss,
Compression: compression,
CompressionLevel: level,
NumParallelColls: numParallelColls,
Filelist: b.externList,
Profile: b.profile,
},
Expand Down Expand Up @@ -662,3 +669,14 @@ func (incompatibleMongodVersionError) Is(err error) bool {
func (e incompatibleMongodVersionError) Unwrap() error {
return errIncompatible
}

func parseCLINumParallelCollsOption(value int32) (*int32, error) {
if value < 0 {
return nil, errors.New("value cannot be negative")
}
if value == 0 {
return nil, nil //nolint:nilnil
}

return &value, nil
}
4 changes: 4 additions & 0 deletions cmd/pbm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func main() {
backupCmd.Flag("profile", "Config profile name").StringVar(&backupOptions.profile)
backupCmd.Flag("compression-level", "Compression level (specific to the compression type)").
IntsVar(&backupOptions.compressionLevel)
backupCmd.Flag("num-parallel-collections", "Number of parallel collections").
Int32Var(&backupOptions.numParallelColls)
backupCmd.Flag("ns", `Namespaces to backup (e.g. "db.*", "db.collection"). If not set, backup all ("*.*")`).
StringVar(&backupOptions.ns)
backupCmd.Flag("wait", "Wait for the backup to finish").
Expand Down Expand Up @@ -239,6 +241,8 @@ func main() {
restoreCmd.Flag("base-snapshot",
"Override setting: Name of older snapshot that PITR will be based on during restore.").
StringVar(&restore.pitrBase)
restoreCmd.Flag("num-parallel-collections", "Number of parallel collections").
Int32Var(&restore.numParallelColls)
restoreCmd.Flag("ns", `Namespaces to restore (e.g. "db1.*,db2.collection2"). If not set, restore all ("*.*")`).
StringVar(&restore.ns)
restoreCmd.Flag("with-users-and-roles", "Includes users and roles for selected database (--ns flag)").
Expand Down
22 changes: 15 additions & 7 deletions cmd/pbm/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type restoreOpts struct {
rsMap string
conf string
ts string

numParallelColls int32
}

type restoreRet struct {
Expand Down Expand Up @@ -105,6 +107,10 @@ func runRestore(
o *restoreOpts,
outf outFormat,
) (fmt.Stringer, error) {
numParallelColls, err := parseCLINumParallelCollsOption(o.numParallelColls)
if err != nil {
return nil, errors.Wrap(err, "parse --num-parallel-collections option")
}
nss, err := parseCLINSOption(o.ns)
if err != nil {
return nil, errors.Wrap(err, "parse --ns option")
Expand Down Expand Up @@ -132,7 +138,7 @@ func runRestore(
}
tdiff := time.Now().Unix() - int64(clusterTime.T)

m, err := doRestore(ctx, conn, o, nss, rsMap, outf)
m, err := doRestore(ctx, conn, o, numParallelColls, nss, rsMap, outf)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -321,6 +327,7 @@ func doRestore(
ctx context.Context,
conn connect.Client,
o *restoreOpts,
numParallelColls *int32,
nss []string,
rsMapping map[string]string,
outf outFormat,
Expand All @@ -335,12 +342,13 @@ func doRestore(
cmd := ctrl.Cmd{
Cmd: ctrl.CmdRestore,
Restore: &ctrl.RestoreCmd{
Name: name,
BackupName: bcp,
Namespaces: nss,
UsersAndRoles: o.usersAndRoles,
RSMap: rsMapping,
External: o.extern,
Name: name,
BackupName: bcp,
NumParallelColls: numParallelColls,
Namespaces: nss,
UsersAndRoles: o.usersAndRoles,
RSMap: rsMapping,
External: o.extern,
},
}
if o.pitr != "" {
Expand Down
8 changes: 7 additions & 1 deletion pbm/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,18 @@ func Decompose(r io.Reader, newWriter NewWriter, nsFilter NSFilterFn, docFilter
return errors.Wrap(err, "metadata")
}

func Compose(w io.Writer, nsFilter NSFilterFn, newReader NewReader) error {
func Compose(w io.Writer, newReader NewReader, nsFilter NSFilterFn, concurrency int) error {
meta, err := readMetadata(newReader)
if err != nil {
return errors.Wrap(err, "metadata")
}

if concurrency > 0 {
// mongorestore uses this field as a number of
// concurrent collections to restore at a moment
meta.Header.ConcurrentCollections = int32(concurrency)
}

nss := make([]*Namespace, 0, len(meta.Namespaces))
for _, ns := range meta.Namespaces {
if nsFilter(NSify(ns.Database, ns.Collection)) {
Expand Down
12 changes: 6 additions & 6 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ type Backup struct {
typ defs.BackupType
incrBase bool
timeouts *config.BackupTimeouts
dumpConns int
numParallelColls int
oplogSlicerInterval time.Duration
}

func New(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, dumpConns int) *Backup {
return &Backup{
leadConn: leadConn,
nodeConn: conn,
brief: brief,
typ: defs.LogicalBackup,
dumpConns: dumpConns,
leadConn: leadConn,
nodeConn: conn,
brief: brief,
typ: defs.LogicalBackup,
numParallelColls: dumpConns,
}
}

Expand Down
12 changes: 11 additions & 1 deletion pbm/backup/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,17 @@ func (b *Backup) doLogical(
if len(nssSize) == 0 {
dump = snapshot.DummyBackup{}
} else {
dump, err = snapshot.NewBackup(b.brief.URI, b.dumpConns, db, coll)
numParallelColls := b.numParallelColls
if bcp.NumParallelColls != nil {
if *bcp.NumParallelColls > 0 {
numParallelColls = int(*bcp.NumParallelColls)
} else {
l.Warning("invalid value of NumParallelCollections (%v). fallback to %v",
numParallelColls, b.numParallelColls)
}
}

dump, err = snapshot.NewBackup(b.brief.URI, numParallelColls, db, coll)
if err != nil {
return errors.Wrap(err, "init mongodump options")
}
Expand Down
3 changes: 3 additions & 0 deletions pbm/ctrl/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type BackupCmd struct {
Namespaces []string `bson:"nss,omitempty"`
Compression compress.CompressionType `bson:"compression"`
CompressionLevel *int `bson:"level,omitempty"`
NumParallelColls *int32 `bson:"numParallelColls,omitempty"`
Filelist bool `bson:"filelist,omitempty"`
Profile string `bson:"profile,omitempty"`
}
Expand All @@ -154,6 +155,8 @@ type RestoreCmd struct {
UsersAndRoles bool `bson:"usersAndRoles,omitempty"`
RSMap map[string]string `bson:"rsMap,omitempty"`

NumParallelColls *int32 `bson:"numParallelColls,omitempty"`

OplogTS primitive.Timestamp `bson:"oplogTS,omitempty"`

External bool `bson:"external"`
Expand Down
15 changes: 13 additions & 2 deletions pbm/restore/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Restore struct {
nodeInfo *topo.NodeInfo
bcpStg storage.Storage
oplogStg storage.Storage

numParallelColls int
// Shards to participate in restore. Num of shards in bcp could
// be less than in the cluster and this is ok. Only these shards
// would be expected to run restore (distributed transactions sync,
Expand Down Expand Up @@ -76,7 +78,13 @@ type oplogRange struct {
type restoreUsersAndRolesOption bool

// New creates a new restore object
func New(leadConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief, rsMap map[string]string) *Restore {
func New(
leadConn connect.Client,
nodeConn *mongo.Client,
brief topo.NodeBrief,
rsMap map[string]string,
numParallelColls int,
) *Restore {
if rsMap == nil {
rsMap = make(map[string]string)
}
Expand All @@ -87,6 +95,8 @@ func New(leadConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief,
brief: brief,
rsMap: rsMap,

numParallelColls: numParallelColls,

indexCatalog: idx.NewIndexCatalog(),
}
}
Expand Down Expand Up @@ -805,7 +815,8 @@ func (r *Restore) RunSnapshot(
return rdr, nil
},
bcp.Compression,
util.MakeSelectedPred(nss))
util.MakeSelectedPred(nss),
r.numParallelColls)
}
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions pbm/snapshot/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ type backuper struct {
pm *progress.BarWriter
}

func NewBackup(curi string, conns int, d, c string) (*backuper, error) {
if conns <= 0 {
conns = 1
}

func NewBackup(curi string, maxParallelColls int, d, c string) (*backuper, error) {
var err error

opts := options.New("pbm-agent:dump", version.Current().Version, "", "", false,
Expand All @@ -49,6 +45,10 @@ func NewBackup(curi string, conns int, d, c string) (*backuper, error) {
}
}

if maxParallelColls < 1 {
maxParallelColls = 1
}

backup := &backuper{}

backup.pm = progress.NewBarWriter(&progressWriter{}, time.Second*60, 24, false)
Expand All @@ -59,7 +59,7 @@ func NewBackup(curi string, conns int, d, c string) (*backuper, error) {
// instead of creating a file. This is not clear at plain sight,
// you nee to look the code to discover it.
Archive: "-",
NumParallelCollections: conns,
NumParallelCollections: maxParallelColls,
},
InputOptions: &mongodump.InputOptions{},
SessionProvider: &db.SessionProvider{},
Expand Down
3 changes: 2 additions & 1 deletion pbm/snapshot/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func DownloadDump(
download DownloadFunc,
compression compress.CompressionType,
match archive.NSFilterFn,
numParallelColls int,
) (io.ReadCloser, error) {
pr, pw := io.Pipe()

Expand All @@ -120,7 +121,7 @@ func DownloadDump(
return r, errors.Wrapf(err, "create decompressor: %q", ns)
}

err := archive.Compose(pw, match, newReader)
err := archive.Compose(pw, newReader, match, numParallelColls)
pw.CloseWithError(errors.Wrap(err, "compose"))
}()

Expand Down
1 change: 0 additions & 1 deletion pbm/snapshot/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func NewRestore(uri string, cfg *config.Config) (io.ReaderFrom, error) {
BypassDocumentValidation: true,
Drop: true,
NumInsertionWorkers: numInsertionWorkers,
NumParallelCollections: 1,
PreserveUUID: preserveUUID,
StopOnError: true,
WriteConcern: "majority",
Expand Down

0 comments on commit 6edc992

Please sign in to comment.