Skip to content

Commit

Permalink
[PBM-1356] ensure capped collection
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Sep 9, 2024
1 parent 5bb76da commit 972b5e7
Showing 1 changed file with 45 additions and 16 deletions.
61 changes: 45 additions & 16 deletions cmd/pbm-agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,13 @@ const (

// setup a new DB for PBM
func setupNewDB(ctx context.Context, conn connect.Client) error {
err := conn.AdminCommand(
ctx,
bson.D{{"create", defs.CmdStreamCollection}, {"capped", true}, {"size", cmdCollectionSizeBytes}},
).Err()
if err != nil && !strings.Contains(err.Error(), "already exists") {
err := ensureCappedCollection(ctx, conn, defs.CmdStreamCollection, cmdCollectionSizeBytes)
if err != nil {
return errors.Wrap(err, "ensure cmd collection")
}

err = conn.AdminCommand(
ctx,
bson.D{{"create", defs.LogCollection}, {"capped", true}, {"size", logsCollectionSizeBytes}},
).Err()
if err != nil && !strings.Contains(err.Error(), "already exists") {
err = ensureCappedCollection(ctx, conn, defs.LogCollection, logsCollectionSizeBytes)
if err != nil {
return errors.Wrap(err, "ensure log collection")
}

Expand Down Expand Up @@ -71,12 +65,9 @@ func setupNewDB(ctx context.Context, conn connect.Client) error {
return errors.Wrapf(err, "ensure lock index on %s", defs.LockOpCollection)
}

err = conn.AdminCommand(
ctx,
bson.D{{"create", defs.PBMOpLogCollection}, {"capped", true}, {"size", pbmOplogCollectionSizeBytes}},
).Err()
if err != nil && !strings.Contains(err.Error(), "already exists") {
return errors.Wrap(err, "ensure log collection")
err = ensureCappedCollection(ctx, conn, defs.PBMOpLogCollection, pbmOplogCollectionSizeBytes)
if err != nil {
return errors.Wrap(err, "ensure ops log collection")
}
_, err = conn.PBMOpLogCollection().Indexes().CreateOne(
ctx,
Expand Down Expand Up @@ -127,3 +118,41 @@ func setupNewDB(ctx context.Context, conn connect.Client) error {

return err
}

func ensureCappedCollection(
ctx context.Context,
conn connect.Client,
collName string,
size int64,
) error {
cmd := bson.D{{"create", collName}, {"capped", true}, {"size", size}}
err := conn.AdminCommand(ctx, cmd).Err()
if err == nil {
return nil
}
if !strings.Contains(err.Error(), "already exists") {
return errors.Wrapf(err, "ensure %q collection", collName)
}

collSpecs, err := conn.MongoClient().Database(defs.DB).
ListCollectionSpecifications(ctx, bson.D{{"name", collName}})
if err != nil {
return errors.Wrap(err, "listDatabases")
}
if len(collSpecs) != 1 {
return errors.Errorf("unexpected one collection spec, got %d", len(collSpecs))
}

ok, _ := collSpecs[0].Options.Lookup("capped").BooleanOK()
if ok {
return nil
}

cmd = bson.D{{"convertToCapped", collName}, {"size", size}}
err = conn.AdminCommand(ctx, cmd).Err()
if err != nil {
return errors.Wrap(err, "convertToCapped")
}

return nil
}

0 comments on commit 972b5e7

Please sign in to comment.