Skip to content

Commit

Permalink
FMWK-569-partition-list
Browse files Browse the repository at this point in the history
- added support of partition list parameter
- refactored after-digest parameter
- refactored partition range
  • Loading branch information
filkeith committed Sep 24, 2024
1 parent e596dba commit 231534b
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 273 deletions.
64 changes: 0 additions & 64 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,70 +24,6 @@ import (
"golang.org/x/sync/semaphore"
)

func TestPartitionRange_validate(t *testing.T) {
type fields struct {
Begin int
Count int
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "Test positive PartitionRange validate",
fields: fields{
Begin: 0,
Count: 5,
},
wantErr: false,
},
{
name: "Test negative PartitionRange validate count 0",
fields: fields{
Begin: 5,
Count: 0,
},
wantErr: true,
},
{
name: "Test negative PartitionRange validate begin -1",
fields: fields{
Begin: -1,
Count: 5,
},
wantErr: true,
},
{
name: "Test negative PartitionRange validate count -1",
fields: fields{
Begin: 5,
Count: -1,
},
wantErr: true,
},
{
name: "Test negative PartitionRange validate total partitions greater than 4096",
fields: fields{
Begin: 4000,
Count: 1000,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := PartitionRange{
Begin: tt.fields.Begin,
Count: tt.fields.Count,
}
if err := p.validate(); (err != nil) != tt.wantErr {
t.Errorf("PartitionRange.validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestNilClient(t *testing.T) {
_, err := NewClient(nil)
assert.Error(t, err, "aerospike client is required")
Expand Down
27 changes: 13 additions & 14 deletions cmd/asbackup/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ Backup Flags:
This argument is mutually exclusive to partition-list/after-digest arguments.
Default: backup all nodes in the cluster
--no-ttl-only Only include records that have no ttl set (persistent records).
-X, --partition-list string List of partitions <filter[,<filter>[...]]> to back up. Partition filters can be ranges,
individual partitions, or records after a specific digest within a single partition.
This argument is mutually exclusive to after-digest.
Note: each partition filter is an individual task which cannot be parallelized, so you can only
achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up
partition ranges manually.
Filter: <begin partition>[-<partition count>]|<digest>
begin partition: 0-4095
partition count: 1-4096 Default: 1
digest: base64 encoded string
Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U=
Default: 0-4096 (all partitions)
Compression Flags:
-z, --compress string Enables compressing of backup files using the specified compression algorithm.
Expand Down Expand Up @@ -164,20 +177,6 @@ Azure Flags:
state will be placed in the directory with name `<namespace>.asb.state`, or
`<prefix>.asb.state` if `--output-file-prefix` is given.
--partition-list <filter[,<filter>[...]]>
List of partitions to back up. Partition filters can be ranges, individual partitions, or
records after a specific digest within a single partition.
This argument is mutually exclusive to after-digest.
Note: each partition filter is an individual task which cannot be parallelized, so you can only
achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up
partition ranges manually.
Filter: <begin partition>[-<partition count>]|<digest>
begin partition: 0-4095
partition count: 1-4096 Default: 1
digest: base64 encoded string
Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U=
Default: 0-4096 (all partitions)
--machine Output machine-readable status updates to the given path, typically a FIFO.
--estimate Estimate the backed-up record size from a random sample of
Expand Down
95 changes: 94 additions & 1 deletion cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package app

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -46,7 +48,6 @@ func mapBackupConfig(
c.NoIndexes = commonParams.NoIndexes
c.RecordsPerSecond = commonParams.RecordsPerSecond
c.FileLimit = backupParams.FileLimit
c.AfterDigest = backupParams.AfterDigest
// The original backup tools have a single parallelism configuration property.
// We may consider splitting the configuration in the future.
c.ParallelWrite = commonParams.Parallel
Expand All @@ -59,6 +60,13 @@ func mapBackupConfig(
c.NodeList = strings.Split(backupParams.NodeList, sliceSeparator)
c.NoTTLOnly = backupParams.NoTTLOnly

pf, err := mapPartitionFilter(backupParams, commonParams)
if err != nil {
return nil, err
}

c.PartitionFilters = pf

sp, err := mapScanPolicy(backupParams, commonParams)
if err != nil {
return nil, err
Expand Down Expand Up @@ -275,3 +283,88 @@ func mapRetryPolicy(r *models.Restore) *bModels.RetryPolicy {
MaxRetries: r.RetryMaxRetries,
}
}

func mapPartitionFilter(b *models.Backup, c *models.Common) ([]*aerospike.PartitionFilter, error) {
switch {
case b.AfterDigest != "":
afterDigestFilter, err := backup.NewPartitionFilterAfterDigest(c.Namespace, b.AfterDigest)
if err != nil {
return nil, fmt.Errorf("failed to parse after digest filter: %w", err)
}

return []*aerospike.PartitionFilter{afterDigestFilter}, nil
case b.PartitionList != "":
filterSlice := strings.Split(b.PartitionList, sliceSeparator)
partitionFilters := make([]*aerospike.PartitionFilter, 0, len(filterSlice))

for i := range filterSlice {
partitionFilter, err := parsePartitionFilter(c.Namespace, filterSlice[i])
if err != nil {
return nil, err
}

partitionFilters = append(partitionFilters, partitionFilter)
}

return partitionFilters, nil
default:
return []*aerospike.PartitionFilter{backup.NewPartitionFilterAll()}, nil
}
}

// parsePartitionFilter check inputs from --partition-list with regexp.
// Parse values and returns *aerospike.PartitionFilter or error
func parsePartitionFilter(namespace, filter string) (*aerospike.PartitionFilter, error) {
// Range 0-4096
//nolint:lll // The regexp is long.
exp := regexp.MustCompile(`^([0-9]|[1-9][0-9]{1,3}|40[0-8][0-9]|409[0-5])\-([1-9]|[1-9][0-9]{1,3}|40[0-8][0-9]|409[0-6])$`)
if exp.MatchString(filter) {
return parsePartitionFilterByRange(filter)
}

// Id 1456
exp = regexp.MustCompile(`^\d+$`)
if exp.MatchString(filter) {
return parsePartitionFilterByID(filter)
}

// Digest (base64 string)
exp = regexp.MustCompile(`^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$`)
if exp.MatchString(filter) {
return parsePartitionFilterByDigest(namespace, filter)
}

return nil, fmt.Errorf("failed to parse partition filter: %s", filter)
}

func parsePartitionFilterByRange(filter string) (*aerospike.PartitionFilter, error) {
bounds := strings.Split(filter, "-")
if len(bounds) != 2 {
return nil, fmt.Errorf("invalid partition filter: %s", filter)
}

begin, err := strconv.Atoi(bounds[0])
if err != nil {
return nil, fmt.Errorf("invalid partition filter %s begin value:%w", filter, err)
}

count, err := strconv.Atoi(bounds[1])
if err != nil {
return nil, fmt.Errorf("invalid partition filter %s count value:%w", filter, err)
}

return backup.NewPartitionFilterByRange(begin, count), nil
}

func parsePartitionFilterByID(filter string) (*aerospike.PartitionFilter, error) {
id, err := strconv.Atoi(filter)
if err != nil {
return nil, fmt.Errorf("invalid partition filter %s id value:%w", filter, err)
}

return backup.NewPartitionFilterByID(id), nil
}

func parsePartitionFilterByDigest(namespace, filter string) (*aerospike.PartitionFilter, error) {
return backup.NewPartitionFilterByDigest(namespace, filter)
}
1 change: 0 additions & 1 deletion cmd/internal/app/configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func TestMapBackupConfig_Success(t *testing.T) {
assert.False(t, config.NoIndexes)
assert.Equal(t, 1000, config.RecordsPerSecond)
assert.Equal(t, int64(5000), config.FileLimit)
assert.Equal(t, "digest", config.AfterDigest)
assert.Equal(t, true, config.NoTTLOnly)

modBefore, _ := time.Parse("2006-01-02_15:04:05", "2023-09-01_12:00:00")
Expand Down
14 changes: 14 additions & 0 deletions cmd/internal/flags/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet {
flagSet.BoolVar(&f.NoTTLOnly, "no-ttl-only",
false,
"Only include records that have no ttl set (persistent records).")
flagSet.StringVarP(&f.PartitionList, "partition-list", "X",
"",
"List of partitions <filter[,<filter>[...]]> to back up. Partition filters can be ranges,\n"+
"individual partitions, or records after a specific digest within a single partition.\n"+
"This argument is mutually exclusive to after-digest.\n"+
"Note: each partition filter is an individual task which cannot be parallelized, so you can only\n"+
"achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up\n"+
"partition ranges manually.\n"+
"Filter: <begin partition>[-<partition count>]|<digest>\n"+
"begin partition: 0-4095\n"+
"partition count: 1-4096 Default: 1\n"+
"digest: base64 encoded string\n"+
"Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U=\n"+
"Default: 0-4096 (all partitions)\n")

return flagSet
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/internal/flags/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestBackup_NewFlagSet(t *testing.T) {
"--compact",
"--node-list", "node1,node2",
"--no-ttl-only",
"--partition-list", "4000,1-236,EjRWeJq83vEjRRI0VniavN7xI0U=",
}

err := flagSet.Parse(args)
Expand All @@ -62,6 +63,7 @@ func TestBackup_NewFlagSet(t *testing.T) {
assert.Equal(t, true, result.Compact, "The compact flag should be parsed correctly")
assert.Equal(t, "node1,node2", result.NodeList, "The node-list flag should be parsed correctly")
assert.Equal(t, true, result.NoTTLOnly, "The no-ttl-only flag should be parsed correctly")
assert.Equal(t, "4000,1-236,EjRWeJq83vEjRRI0VniavN7xI0U=", result.PartitionList, "The partition-list flag should be parsed correctly")
}

func TestBackup_NewFlagSet_DefaultValues(t *testing.T) {
Expand Down Expand Up @@ -89,4 +91,5 @@ func TestBackup_NewFlagSet_DefaultValues(t *testing.T) {
assert.Equal(t, false, result.Compact, "The default value for compact should be false")
assert.Equal(t, "", result.NodeList, "The default value for node-list should be empty")
assert.Equal(t, false, result.NoTTLOnly, "The default value for no-ttl-only should be false")
assert.Equal(t, "", result.PartitionList, "The default value for partition-list should be empty string")
}
1 change: 1 addition & 0 deletions cmd/internal/models/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Backup struct {
Compact bool
NodeList string
NoTTLOnly bool
PartitionList string
}

// ShouldClearTarget check if we should clean target directory.
Expand Down
Loading

0 comments on commit 231534b

Please sign in to comment.