Skip to content

Commit

Permalink
FMWK-405 Add support for node-list option (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
filkeith committed Sep 23, 2024
1 parent 29f24ee commit a2fb891
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 52 deletions.
17 changes: 8 additions & 9 deletions cmd/asbackup/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ Aerospike Client Flags:
Backup Flags:
-d, --directory string The Directory that holds the backup files. Required, unless -o or -e is used.
-n, --namespace string The namespace to be backed up. Required.
-s, --set stringArray The set(s) to be backed up.
-s, --set string The set(s) to be backed up.
If multiple sets are being backed up, filter-exp cannot be used.
if empty all sets.
-L, --records-per-second int Limit total returned records per second (rps).
Do not apply rps limit if records-per-second is zero.
-B, --bin-list stringArray Only include the given bins in the backup.
-B, --bin-list string Only include the given bins in the backup.
If empty include all bins.
-w, --parallel int Maximum number of scan calls to run in parallel.
If only one partition range is given, or the entire namespace is being backed up, the range
Expand Down Expand Up @@ -102,6 +102,12 @@ Backup Flags:
otherwise workers run in parallel for partitions.
--remove-artifacts Remove existing backup file (-o) or files (-d) without performing a backup.
-C, --compact Do not apply base-64 encoding to BLOBs; results in smaller backup files.
-l, --node-list string <IP addr 1>:<port 1>[,<IP addr 2>:<port 2>[,...]]
<IP addr 1>:<TLS_NAME 1>:<port 1>[,<IP addr 2>:<TLS_NAME 2>:<port 2>[,...]]
Backup the given cluster nodes only.
The job is parallelized by number of nodes unless --parallel is set less than nodes number.
This argument is mutually exclusive to partition-list/after-digest arguments.
Default: backup all nodes in the cluster
Compression Flags:
-z, --compress string Enables compressing of backup files using the specified compression algorithm.
Expand Down Expand Up @@ -156,13 +162,6 @@ Azure Flags:
exact path is where the backup file will be placed. If a directory is given, the backup
state will be placed in the directory with name `<namespace>.asb.state`, or
`<prefix>.asb.state` if `--output-file-prefix` is given.
--node-list <IP addr 1>:<port 1>[,<IP addr 2>:<port 2>[,...]]
<IP addr 1>:<TLS_NAME 1>:<port 1>[,<IP addr 2>:<TLS_NAME 2>:<port 2>[,...]]
Backup the given cluster nodes only.
The job is parallelized over 16 scans unless --parallel is set to another value.
This argument is mutually exclusive to partition-list/after-digest arguments.
Default: backup all nodes in the cluster
--partition-list <filter[,<filter>[...]]>
List of partitions to back up. Partition filters can be ranges, individual partitions, or
Expand Down
19 changes: 7 additions & 12 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
bModels "github.com/aerospike/backup-go/models"
)

const sliceSeparator = ","

func mapBackupConfig(
backupParams *models.Backup,
commonParams *models.Common,
Expand All @@ -38,8 +40,8 @@ func mapBackupConfig(

c := backup.NewDefaultBackupConfig()
c.Namespace = commonParams.Namespace
c.SetList = commonParams.SetList
c.BinList = commonParams.BinList
c.SetList = strings.Split(commonParams.SetList, sliceSeparator)
c.BinList = strings.Split(commonParams.BinList, sliceSeparator)
c.NoRecords = commonParams.NoRecords
c.NoIndexes = commonParams.NoIndexes
c.RecordsPerSecond = commonParams.RecordsPerSecond
Expand All @@ -54,6 +56,7 @@ func mapBackupConfig(
c.Bandwidth = commonParams.Nice * 1024 * 1024
c.ParallelNodes = backupParams.ParallelNodes
c.Compact = backupParams.Compact
c.NodeList = strings.Split(backupParams.NodeList, sliceSeparator)

sp, err := mapScanPolicy(backupParams, commonParams)
if err != nil {
Expand Down Expand Up @@ -83,10 +86,6 @@ func mapBackupConfig(
c.ModAfter = &modAfterTime
}

if len(commonParams.SetList) > 0 {
c.SetList = commonParams.SetList
}

return c, nil
}

Expand All @@ -103,8 +102,8 @@ func mapRestoreConfig(

c := backup.NewDefaultRestoreConfig()
c.Namespace = mapRestoreNamespace(commonParams.Namespace)
c.SetList = commonParams.SetList
c.BinList = commonParams.BinList
c.SetList = strings.Split(commonParams.SetList, sliceSeparator)
c.BinList = strings.Split(commonParams.BinList, sliceSeparator)
c.NoRecords = commonParams.NoRecords
c.NoIndexes = commonParams.NoIndexes
c.RecordsPerSecond = commonParams.RecordsPerSecond
Expand All @@ -120,10 +119,6 @@ func mapRestoreConfig(
c.SecretAgentConfig = mapSecretAgentConfig(secretAgent)
c.RetryPolicy = mapRetryPolicy(restoreParams)

if len(commonParams.SetList) > 0 {
c.SetList = commonParams.SetList
}

return c, nil
}

Expand Down
24 changes: 18 additions & 6 deletions cmd/internal/app/configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,20 @@ func TestMapBackupConfig_Success(t *testing.T) {
ModifiedBefore: "2023-09-01_12:00:00",
ModifiedAfter: "2023-09-02_12:00:00",
FilterExpression: "k1EDpHRlc3Q=",
ParallelNodes: true,
Compact: true,
NodeList: "node1,node2",
}

commonModel := &models.Common{
Namespace: "test-namespace",
SetList: []string{"set1", "set2"},
BinList: []string{"bin1", "bin2"},
SetList: "set1,set2",
BinList: "bin1,bin2",
NoRecords: true,
NoIndexes: false,
RecordsPerSecond: 1000,
Nice: 10, // 10 MiB
Nice: 10,
Parallel: 5,
}

compression := testCompression()
Expand All @@ -73,10 +77,12 @@ func TestMapBackupConfig_Success(t *testing.T) {

config, err := mapBackupConfig(backupModel, commonModel, compression, encryption, secretAgent)
assert.NoError(t, err)

assert.Equal(t, "test-namespace", config.Namespace)
assert.ElementsMatch(t, []string{"set1", "set2"}, config.SetList)
assert.ElementsMatch(t, []string{"bin1", "bin2"}, config.BinList)
assert.True(t, config.NoRecords)
assert.False(t, config.NoIndexes)
assert.Equal(t, 1000, config.RecordsPerSecond)
assert.Equal(t, int64(5000), config.FileLimit)
assert.Equal(t, "digest", config.AfterDigest)
Expand All @@ -86,7 +92,6 @@ func TestMapBackupConfig_Success(t *testing.T) {
assert.Equal(t, &modBefore, config.ModBefore)
assert.Equal(t, &modAfter, config.ModAfter)

// Compression, Encryption, and Secret Agent
assert.NotNil(t, config.CompressionPolicy)
assert.Equal(t, "ZSTD", config.CompressionPolicy.Mode)
assert.Equal(t, 3, config.CompressionPolicy.Level)
Expand All @@ -99,6 +104,13 @@ func TestMapBackupConfig_Success(t *testing.T) {
assert.Equal(t, "localhost", *config.SecretAgentConfig.Address)
assert.Equal(t, "tcp", *config.SecretAgentConfig.ConnectionType)
assert.Equal(t, 8080, *config.SecretAgentConfig.Port)

assert.Equal(t, 5, config.ParallelWrite, "The ParallelWrite should be set correctly")
assert.Equal(t, 5, config.ParallelRead, "The ParallelRead should be set correctly")
assert.Equal(t, 10*1024*1024, config.Bandwidth, "The Bandwidth should be set to 10 MiB in bytes")
assert.True(t, config.ParallelNodes, "The ParallelNodes flag should be set correctly")
assert.True(t, config.Compact, "The Compact flag should be set correctly")
assert.ElementsMatch(t, []string{"node1", "node2"}, config.NodeList, "The NodeList should be set correctly")
}

func TestMapBackupConfig_MissingNamespace(t *testing.T) {
Expand Down Expand Up @@ -158,8 +170,8 @@ func TestMapRestoreConfig_Success(t *testing.T) {
restoreModel := &models.Restore{}
commonModel := &models.Common{
Namespace: "test-namespace",
SetList: []string{"set1", "set2"},
BinList: []string{"bin1", "bin2"},
SetList: "set1,set2",
BinList: "bin1,bin2",
NoRecords: true,
NoIndexes: false,
RecordsPerSecond: 1000,
Expand Down
8 changes: 8 additions & 0 deletions cmd/internal/flags/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet {
flagSet.BoolVarP(&f.Compact, "compact", "C",
false,
"Do not apply base-64 encoding to BLOBs; results in smaller backup files.")
flagSet.StringVarP(&f.NodeList, "node-list", "l",
"",
"<IP addr 1>:<port 1>[,<IP addr 2>:<port 2>[,...]]\n"+
"<IP addr 1>:<TLS_NAME 1>:<port 1>[,<IP addr 2>:<TLS_NAME 2>:<port 2>[,...]]\n"+
"Backup the given cluster nodes only.\n"+
"The job is parallelized by number of nodes unless --parallel is set less than nodes number.\n"+
"This argument is mutually exclusive to partition-list/after-digest arguments.\n"+
"Default: backup all nodes in the cluster")

return flagSet
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/internal/flags/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestBackup_NewFlagSet(t *testing.T) {
"--parallel-nodes",
"--remove-artifacts",
"--compact",
"--node-list", "node1,node2",
}

err := flagSet.Parse(args)
Expand All @@ -58,6 +59,7 @@ func TestBackup_NewFlagSet(t *testing.T) {
assert.Equal(t, true, result.ParallelNodes, "The parallel-nodes flag should be parsed correctly")
assert.Equal(t, true, result.RemoveArtifacts, "The remove-artifacts flag should be parsed correctly")
assert.Equal(t, true, result.Compact, "The compact flag should be parsed correctly")
assert.Equal(t, "node1,node2", result.NodeList, "The node-list flag should be parsed correctly")
}

func TestBackup_NewFlagSet_DefaultValues(t *testing.T) {
Expand All @@ -83,4 +85,5 @@ func TestBackup_NewFlagSet_DefaultValues(t *testing.T) {
assert.Equal(t, false, result.ParallelNodes, "The default value for parallel-nodes should be false")
assert.Equal(t, false, result.RemoveArtifacts, "The default value for remove-artifacts should be false")
assert.Equal(t, false, result.Compact, "The default value for compact should be false")
assert.Equal(t, "", result.NodeList, "The default value for node-list should be empty")
}
8 changes: 4 additions & 4 deletions cmd/internal/flags/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func (f *Common) NewFlagSet() *pflag.FlagSet {
flagSet.StringVarP(&f.Namespace, "namespace", "n",
"",
"The namespace to be backed up. Required.")
flagSet.StringArrayVarP(&f.SetList, "set", "s",
nil,
flagSet.StringVarP(&f.SetList, "set", "s",
"",
"The set(s) to be backed up.\n"+
"If multiple sets are being backed up, filter-exp cannot be used.\n"+
"if empty all sets.")
flagSet.IntVarP(&f.RecordsPerSecond, "records-per-second", "L",
0,
"Limit total returned records per second (rps).\n"+
"Do not apply rps limit if records-per-second is zero.")
flagSet.StringArrayVarP(&f.BinList, "bin-list", "B",
nil,
flagSet.StringVarP(&f.BinList, "bin-list", "B",
"",
"Only include the given bins in the backup.\n"+
"If empty include all bins.")
flagSet.IntVarP(&f.Parallel, "parallel", "w",
Expand Down
14 changes: 6 additions & 8 deletions cmd/internal/flags/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ func TestCommon_NewFlagSet(t *testing.T) {
args := []string{
"--directory", "/path/to/backup",
"--namespace", "test-namespace",
"--set", "set1",
"--set", "set2",
"--set", "set1,set2",
"--records-per-second", "5000",
"--bin-list", "bin1",
"--bin-list", "bin2",
"--bin-list", "bin1,bin2",
"--parallel", "10",
"--no-records",
"--no-indexes",
Expand All @@ -50,9 +48,9 @@ func TestCommon_NewFlagSet(t *testing.T) {

assert.Equal(t, "/path/to/backup", result.Directory, "The directory flag should be parsed correctly")
assert.Equal(t, "test-namespace", result.Namespace, "The namespace flag should be parsed correctly")
assert.Equal(t, []string{"set1", "set2"}, result.SetList, "The set list flag should be parsed correctly")
assert.Equal(t, "set1,set2", result.SetList, "The set list flag should be parsed correctly")
assert.Equal(t, 5000, result.RecordsPerSecond, "The records-per-second flag should be parsed correctly")
assert.Equal(t, []string{"bin1", "bin2"}, result.BinList, "The bin-list flag should be parsed correctly")
assert.Equal(t, "bin1,bin2", result.BinList, "The bin-list flag should be parsed correctly")
assert.Equal(t, 10, result.Parallel, "The parallel flag should be parsed correctly")
assert.True(t, result.NoRecords, "The no-records flag should be parsed correctly")
assert.True(t, result.NoIndexes, "The no-indexes flag should be parsed correctly")
Expand All @@ -76,9 +74,9 @@ func TestCommon_NewFlagSet_DefaultValues(t *testing.T) {
// Verify default values
assert.Equal(t, "", result.Directory, "The default value for directory should be an empty string")
assert.Equal(t, "", result.Namespace, "The default value for namespace should be an empty string")
assert.Nil(t, result.SetList, "The default value for set-list should be nil")
assert.Equal(t, "", result.SetList, "The default value for set-list should be nil")
assert.Equal(t, 0, result.RecordsPerSecond, "The default value for records-per-second should be 0")
assert.Nil(t, result.BinList, "The default value for bin-list should be nil")
assert.Equal(t, "", result.BinList, "The default value for bin-list should be nil")
assert.Equal(t, 1, result.Parallel, "The default value for parallel should be 1")
assert.False(t, result.NoRecords, "The default value for no-records should be false")
assert.False(t, result.NoIndexes, "The default value for no-indexes should be false")
Expand Down
1 change: 1 addition & 0 deletions cmd/internal/models/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Backup struct {
ParallelNodes bool
RemoveArtifacts bool
Compact bool
NodeList string
}

// ShouldClearTarget check if we should clean target directory.
Expand Down
4 changes: 2 additions & 2 deletions cmd/internal/models/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package models
type Common struct {
Directory string
Namespace string
SetList []string
BinList []string
SetList string
BinList string
Parallel int
NoRecords bool
NoIndexes bool
Expand Down
15 changes: 13 additions & 2 deletions config_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type BackupConfig struct {
SecretAgentConfig *SecretAgentConfig
// Namespace is the Aerospike namespace to back up.
Namespace string
// NodeList contains a list of nodes to back up.
// <IP addr 1>:<port 1>[,<IP addr 2>:<port 2>[,...]]
// <IP addr 1>:<TLS_NAME 1>:<port 1>[,<IP addr 2>:<TLS_NAME 2>:<port 2>[,...]]
// Backup the given cluster nodes only.
// If it is set, ParallelNodes automatically set to true.
// This argument is mutually exclusive to partition-list/AfterDigest arguments.
NodeList []string
// SetList is the Aerospike set to back up (optional, given an empty list,
// all sets will be backed up).
SetList []string
Expand Down Expand Up @@ -153,17 +160,21 @@ func (c *BackupConfig) validate() error {
return fmt.Errorf("modified before must be strictly greater than modified after")
}

if c.ParallelNodes && (c.Partitions.Begin != 0 || c.Partitions.Count != 0) {
if (c.ParallelNodes || len(c.NodeList) != 0) && (c.Partitions.Begin != 0 || c.Partitions.Count != 0) {
return fmt.Errorf("parallel by nodes and partitions and the same time not allowed")
}

if !c.ParallelNodes {
if !c.ParallelNodes && len(c.NodeList) == 0 {
if err := c.Partitions.validate(); err != nil {
return err
}
}

if c.AfterDigest != "" {
if c.ParallelNodes || len(c.NodeList) != 0 {
return fmt.Errorf("parallel by nodes/node list and after digest at the same time not allowed")
}

if _, err := base64.StdEncoding.DecodeString(c.AfterDigest); err != nil {
return fmt.Errorf("after digest must be base64 encoded string: %w", err)
}
Expand Down
Loading

0 comments on commit a2fb891

Please sign in to comment.