Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-569 Add support for the partition-list option #153

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
64 changes: 0 additions & 64 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,70 +24,6 @@ import (
"golang.org/x/sync/semaphore"
)

func TestPartitionRange_validate(t *testing.T) {
type fields struct {
Begin int
Count int
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "Test positive PartitionRange validate",
fields: fields{
Begin: 0,
Count: 5,
},
wantErr: false,
},
{
name: "Test negative PartitionRange validate count 0",
fields: fields{
Begin: 5,
Count: 0,
},
wantErr: true,
},
{
name: "Test negative PartitionRange validate begin -1",
fields: fields{
Begin: -1,
Count: 5,
},
wantErr: true,
},
{
name: "Test negative PartitionRange validate count -1",
fields: fields{
Begin: 5,
Count: -1,
},
wantErr: true,
},
{
name: "Test negative PartitionRange validate total partitions greater than 4096",
fields: fields{
Begin: 4000,
Count: 1000,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := PartitionRange{
Begin: tt.fields.Begin,
Count: tt.fields.Count,
}
if err := p.validate(); (err != nil) != tt.wantErr {
t.Errorf("PartitionRange.validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestNilClient(t *testing.T) {
_, err := NewClient(nil)
assert.Error(t, err, "aerospike client is required")
Expand Down
27 changes: 13 additions & 14 deletions cmd/asbackup/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ Backup Flags:
Default: backup all nodes in the cluster
--no-ttl-only Only include records that have no ttl set (persistent records).
--prefer-racks string <rack id 1>[,<rack id 2>[,...]]\nA list of Aerospike Server rack IDs to prefer when reading records for a backup.
-X, --partition-list string List of partitions <filter[,<filter>[...]]> to back up. Partition filters can be ranges,
individual partitions, or records after a specific digest within a single partition.
This argument is mutually exclusive to after-digest.
Note: each partition filter is an individual task which cannot be parallelized, so you can only
achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up
partition ranges manually.
Filter: <begin partition>[-<partition count>]|<digest>
begin partition: 0-4095
partition count: 1-4096 Default: 1
digest: base64 encoded string
Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U=
Default: 0-4096 (all partitions)


Compression Flags:
-z, --compress string Enables compressing of backup files using the specified compression algorithm.
Expand Down Expand Up @@ -165,20 +178,6 @@ Azure Flags:
state will be placed in the directory with name `<namespace>.asb.state`, or
`<prefix>.asb.state` if `--output-file-prefix` is given.

--partition-list <filter[,<filter>[...]]>
List of partitions to back up. Partition filters can be ranges, individual partitions, or
records after a specific digest within a single partition.
This argument is mutually exclusive to after-digest.
Note: each partition filter is an individual task which cannot be parallelized, so you can only
achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up
partition ranges manually.
Filter: <begin partition>[-<partition count>]|<digest>
begin partition: 0-4095
partition count: 1-4096 Default: 1
digest: base64 encoded string
Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U=
Default: 0-4096 (all partitions)

--machine Output machine-readable status updates to the given path, typically a FIFO.

--estimate Estimate the backed-up record size from a random sample of
Expand Down
4 changes: 4 additions & 0 deletions cmd/internal/app/asbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func NewASBackup(
return nil, err
}

if err := validateBackupParams(backupParams); err != nil {
return nil, err
}

writer, err := getWriter(ctx, backupParams, commonParams, awsS3, gcpStorage, azureBlob)
if err != nil {
return nil, fmt.Errorf("failed to create backup writer: %w", err)
Expand Down
99 changes: 97 additions & 2 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package app

import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -25,6 +27,13 @@ import (
bModels "github.com/aerospike/backup-go/models"
)

var (
//nolint:lll // The regexp is long.
expPartitionRange = regexp.MustCompile(`^([0-9]|[1-9][0-9]{1,3}|40[0-8][0-9]|409[0-5])\-([1-9]|[1-9][0-9]{1,3}|40[0-8][0-9]|409[0-6])$`)
expPartitionID = regexp.MustCompile(`^\d+$`)
expPartitionDigest = regexp.MustCompile(`^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$`)
)

func mapBackupConfig(
backupParams *models.Backup,
commonParams *models.Common,
Expand All @@ -44,7 +53,6 @@ func mapBackupConfig(
c.NoIndexes = commonParams.NoIndexes
c.RecordsPerSecond = commonParams.RecordsPerSecond
c.FileLimit = backupParams.FileLimit
c.AfterDigest = backupParams.AfterDigest
// The original backup tools have a single parallelism configuration property.
// We may consider splitting the configuration in the future.
c.ParallelWrite = commonParams.Parallel
Expand All @@ -57,11 +65,17 @@ func mapBackupConfig(

// Overwrite partitions if we use nodes.
if backupParams.ParallelNodes || backupParams.NodeList != "" {
c.Partitions = backup.PartitionRange{}
c.ParallelNodes = backupParams.ParallelNodes
c.NodeList = splitByComma(backupParams.NodeList)
}

pf, err := mapPartitionFilter(backupParams, commonParams)
if err != nil {
return nil, err
}

c.PartitionFilters = pf

sp, err := mapScanPolicy(backupParams, commonParams)
if err != nil {
return nil, err
Expand Down Expand Up @@ -290,3 +304,84 @@ func splitByComma(s string) []string {

return strings.Split(s, ",")
}

func mapPartitionFilter(b *models.Backup, c *models.Common) ([]*aerospike.PartitionFilter, error) {
switch {
case b.AfterDigest != "":
reugn marked this conversation as resolved.
Show resolved Hide resolved
afterDigestFilter, err := backup.NewPartitionFilterAfterDigest(c.Namespace, b.AfterDigest)
if err != nil {
return nil, fmt.Errorf("failed to parse after digest filter: %w", err)
}

return []*aerospike.PartitionFilter{afterDigestFilter}, nil
case b.PartitionList != "":
filterSlice := splitByComma(b.PartitionList)
partitionFilters := make([]*aerospike.PartitionFilter, 0, len(filterSlice))

for i := range filterSlice {
partitionFilter, err := parsePartitionFilter(c.Namespace, filterSlice[i])
if err != nil {
return nil, err
}

partitionFilters = append(partitionFilters, partitionFilter)
}

return partitionFilters, nil
default:
return []*aerospike.PartitionFilter{backup.NewPartitionFilterAll()}, nil
}
}

// parsePartitionFilter check inputs from --partition-list with regexp.
// Parse values and returns *aerospike.PartitionFilter or error
func parsePartitionFilter(namespace, filter string) (*aerospike.PartitionFilter, error) {
// Range 0-4096
if expPartitionRange.MatchString(filter) {
return parsePartitionFilterByRange(filter)
}

// Id 1456
if expPartitionID.MatchString(filter) {
return parsePartitionFilterByID(filter)
}

// Digest (base64 string)
if expPartitionDigest.MatchString(filter) {
return parsePartitionFilterByDigest(namespace, filter)
}

return nil, fmt.Errorf("failed to parse partition filter: %s", filter)
}

func parsePartitionFilterByRange(filter string) (*aerospike.PartitionFilter, error) {
bounds := strings.Split(filter, "-")
if len(bounds) != 2 {
return nil, fmt.Errorf("invalid partition filter: %s", filter)
}

begin, err := strconv.Atoi(bounds[0])
if err != nil {
return nil, fmt.Errorf("invalid partition filter %s begin value: %w", filter, err)
}

count, err := strconv.Atoi(bounds[1])
if err != nil {
return nil, fmt.Errorf("invalid partition filter %s count value: %w", filter, err)
}

return backup.NewPartitionFilterByRange(begin, count), nil
}

func parsePartitionFilterByID(filter string) (*aerospike.PartitionFilter, error) {
id, err := strconv.Atoi(filter)
if err != nil {
return nil, fmt.Errorf("invalid partition filter %s id value: %w", filter, err)
}

return backup.NewPartitionFilterByID(id), nil
}

func parsePartitionFilterByDigest(namespace, filter string) (*aerospike.PartitionFilter, error) {
reugn marked this conversation as resolved.
Show resolved Hide resolved
return backup.NewPartitionFilterByDigest(namespace, filter)
}
117 changes: 115 additions & 2 deletions cmd/internal/app/configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"testing"
"time"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
"github.com/aerospike/backup-go/cmd/internal/models"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -51,7 +53,7 @@ func TestMapBackupConfig_Success(t *testing.T) {
t.Parallel()
backupModel := &models.Backup{
FileLimit: 5000,
AfterDigest: "digest",
AfterDigest: "AvDsV2KuSZHZugDBftnLxGpR+88=",
ModifiedBefore: "2023-09-01_12:00:00",
ModifiedAfter: "2023-09-02_12:00:00",
FilterExpression: "k1EDpHRlc3Q=",
Expand Down Expand Up @@ -86,7 +88,6 @@ func TestMapBackupConfig_Success(t *testing.T) {
assert.False(t, config.NoIndexes)
assert.Equal(t, 1000, config.RecordsPerSecond)
assert.Equal(t, int64(5000), config.FileLimit)
assert.Equal(t, "digest", config.AfterDigest)
assert.Equal(t, true, config.NoTTLOnly)

modBefore, _ := time.Parse("2006-01-02_15:04:05", "2023-09-01_12:00:00")
Expand Down Expand Up @@ -333,3 +334,115 @@ func TestMapRestoreNamespace_InvalidNamespace(t *testing.T) {
result := mapRestoreNamespace(ns)
assert.Nil(t, result, "Result should be nil for invalid input")
}

func TestMapPartitionFilter_AfterDigest(t *testing.T) {
t.Parallel()
backupModel := &models.Backup{
AfterDigest: "AvDsV2KuSZHZugDBftnLxGpR+88=",
}

commonModel := &models.Common{
Namespace: "test-namespace",
}

filters, err := mapPartitionFilter(backupModel, commonModel)
assert.NoError(t, err)
assert.NotNil(t, filters)
assert.Equal(t, 1, len(filters))
assert.IsType(t, &aerospike.PartitionFilter{}, filters[0])
}

func TestMapPartitionFilter_PartitionList(t *testing.T) {
t.Parallel()
backupModel := &models.Backup{
PartitionList: "0-1024",
}

commonModel := &models.Common{
Namespace: "test-namespace",
}

filters, err := mapPartitionFilter(backupModel, commonModel)
assert.NoError(t, err)
assert.NotNil(t, filters)
assert.Equal(t, 1, len(filters))
assert.IsType(t, &aerospike.PartitionFilter{}, filters[0])
}

func TestMapPartitionFilter_NoFilters(t *testing.T) {
t.Parallel()
backupModel := &models.Backup{}

commonModel := &models.Common{
Namespace: "test-namespace",
}

filters, err := mapPartitionFilter(backupModel, commonModel)
assert.NoError(t, err)
assert.NotNil(t, filters)
assert.Equal(t, 1, len(filters))
assert.Equal(t, backup.NewPartitionFilterAll(), filters[0])
}

func TestParsePartitionFilterByRange_Valid(t *testing.T) {
t.Parallel()
filter := "100-200"
parsedFilter, err := parsePartitionFilterByRange(filter)
assert.NoError(t, err)
assert.NotNil(t, parsedFilter)
}

func TestParsePartitionFilterByRange_InvalidRange(t *testing.T) {
t.Parallel()
filter := "invalid-range"
parsedFilter, err := parsePartitionFilterByRange(filter)
assert.Error(t, err)
assert.Nil(t, parsedFilter)
assert.Contains(t, err.Error(), "invalid partition filter")
}

func TestParsePartitionFilterByID_Valid(t *testing.T) {
t.Parallel()
filter := "1234"
parsedFilter, err := parsePartitionFilterByID(filter)
assert.NoError(t, err)
assert.NotNil(t, parsedFilter)
}

func TestParsePartitionFilterByID_InvalidID(t *testing.T) {
t.Parallel()
filter := "invalid-id"
parsedFilter, err := parsePartitionFilterByID(filter)
assert.Error(t, err)
assert.Nil(t, parsedFilter)
assert.Contains(t, err.Error(), "invalid partition filter")
}

func TestParsePartitionFilterByDigest_Valid(t *testing.T) {
t.Parallel()
namespace := "test-namespace"
filter := "EjRWeJq83vEjRRI0VniavN7xI0U=" // Base64-encoded digest
parsedFilter, err := parsePartitionFilterByDigest(namespace, filter)
assert.NoError(t, err)
assert.NotNil(t, parsedFilter)
}

func TestParsePartitionFilterByDigest_InvalidDigest(t *testing.T) {
t.Parallel()
namespace := "test-namespace"
filter := "invalid-digest"
parsedFilter, err := parsePartitionFilterByDigest(namespace, filter)
assert.Error(t, err)
assert.Nil(t, parsedFilter)
assert.Contains(t, err.Error(), "failed to decode after-digest")
}

func TestParsePartitionFilter_InvalidFilter(t *testing.T) {
t.Parallel()
namespace := "test-namespace"
filter := "invalid-filter"
parsedFilter, err := parsePartitionFilter(namespace, filter)
assert.Error(t, err)
assert.Nil(t, parsedFilter)
assert.Contains(t, err.Error(), "failed to parse partition filter")
}
Loading
Loading