diff --git a/client_test.go b/client_test.go index 1ea6f844..3e3ac0d2 100644 --- a/client_test.go +++ b/client_test.go @@ -24,70 +24,6 @@ import ( "golang.org/x/sync/semaphore" ) -func TestPartitionRange_validate(t *testing.T) { - type fields struct { - Begin int - Count int - } - tests := []struct { - name string - fields fields - wantErr bool - }{ - { - name: "Test positive PartitionRange validate", - fields: fields{ - Begin: 0, - Count: 5, - }, - wantErr: false, - }, - { - name: "Test negative PartitionRange validate count 0", - fields: fields{ - Begin: 5, - Count: 0, - }, - wantErr: true, - }, - { - name: "Test negative PartitionRange validate begin -1", - fields: fields{ - Begin: -1, - Count: 5, - }, - wantErr: true, - }, - { - name: "Test negative PartitionRange validate count -1", - fields: fields{ - Begin: 5, - Count: -1, - }, - wantErr: true, - }, - { - name: "Test negative PartitionRange validate total partitions greater than 4096", - fields: fields{ - Begin: 4000, - Count: 1000, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := PartitionRange{ - Begin: tt.fields.Begin, - Count: tt.fields.Count, - } - if err := p.validate(); (err != nil) != tt.wantErr { - t.Errorf("PartitionRange.validate() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - func TestNilClient(t *testing.T) { _, err := NewClient(nil) assert.Error(t, err, "aerospike client is required") diff --git a/cmd/asbackup/readme.md b/cmd/asbackup/readme.md index 2c9dbed3..72b50501 100644 --- a/cmd/asbackup/readme.md +++ b/cmd/asbackup/readme.md @@ -109,6 +109,19 @@ Backup Flags: This argument is mutually exclusive to partition-list/after-digest arguments. Default: backup all nodes in the cluster --no-ttl-only Only include records that have no ttl set (persistent records). + -X, --partition-list string List of partitions [...]]> to back up. Partition filters can be ranges, + individual partitions, or records after a specific digest within a single partition. + This argument is mutually exclusive to after-digest. + Note: each partition filter is an individual task which cannot be parallelized, so you can only + achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up + partition ranges manually. + Filter: [-]| + begin partition: 0-4095 + partition count: 1-4096 Default: 1 + digest: base64 encoded string + Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U= + Default: 0-4096 (all partitions) + Compression Flags: -z, --compress string Enables compressing of backup files using the specified compression algorithm. @@ -164,20 +177,6 @@ Azure Flags: state will be placed in the directory with name `.asb.state`, or `.asb.state` if `--output-file-prefix` is given. ---partition-list [...]]> - List of partitions to back up. Partition filters can be ranges, individual partitions, or - records after a specific digest within a single partition. - This argument is mutually exclusive to after-digest. - Note: each partition filter is an individual task which cannot be parallelized, so you can only - achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up - partition ranges manually. - Filter: [-]| - begin partition: 0-4095 - partition count: 1-4096 Default: 1 - digest: base64 encoded string - Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U= - Default: 0-4096 (all partitions) - --machine Output machine-readable status updates to the given path, typically a FIFO. --estimate Estimate the backed-up record size from a random sample of diff --git a/cmd/internal/app/configs.go b/cmd/internal/app/configs.go index 0f7f66b2..f709f892 100644 --- a/cmd/internal/app/configs.go +++ b/cmd/internal/app/configs.go @@ -16,6 +16,8 @@ package app import ( "fmt" + "regexp" + "strconv" "strings" "time" @@ -46,7 +48,6 @@ func mapBackupConfig( c.NoIndexes = commonParams.NoIndexes c.RecordsPerSecond = commonParams.RecordsPerSecond c.FileLimit = backupParams.FileLimit - c.AfterDigest = backupParams.AfterDigest // The original backup tools have a single parallelism configuration property. // We may consider splitting the configuration in the future. c.ParallelWrite = commonParams.Parallel @@ -59,6 +60,13 @@ func mapBackupConfig( c.NodeList = strings.Split(backupParams.NodeList, sliceSeparator) c.NoTTLOnly = backupParams.NoTTLOnly + pf, err := mapPartitionFilter(backupParams, commonParams) + if err != nil { + return nil, err + } + + c.PartitionFilters = pf + sp, err := mapScanPolicy(backupParams, commonParams) if err != nil { return nil, err @@ -275,3 +283,88 @@ func mapRetryPolicy(r *models.Restore) *bModels.RetryPolicy { MaxRetries: r.RetryMaxRetries, } } + +func mapPartitionFilter(b *models.Backup, c *models.Common) ([]*aerospike.PartitionFilter, error) { + switch { + case b.AfterDigest != "": + afterDigestFilter, err := backup.NewPartitionFilterAfterDigest(c.Namespace, b.AfterDigest) + if err != nil { + return nil, fmt.Errorf("failed to parse after digest filter: %w", err) + } + + return []*aerospike.PartitionFilter{afterDigestFilter}, nil + case b.PartitionList != "": + filterSlice := strings.Split(b.PartitionList, sliceSeparator) + partitionFilters := make([]*aerospike.PartitionFilter, 0, len(filterSlice)) + + for i := range filterSlice { + partitionFilter, err := parsePartitionFilter(c.Namespace, filterSlice[i]) + if err != nil { + return nil, err + } + + partitionFilters = append(partitionFilters, partitionFilter) + } + + return partitionFilters, nil + default: + return []*aerospike.PartitionFilter{backup.NewPartitionFilterAll()}, nil + } +} + +// parsePartitionFilter check inputs from --partition-list with regexp. +// Parse values and returns *aerospike.PartitionFilter or error +func parsePartitionFilter(namespace, filter string) (*aerospike.PartitionFilter, error) { + // Range 0-4096 + //nolint:lll // The regexp is long. + exp := regexp.MustCompile(`^([0-9]|[1-9][0-9]{1,3}|40[0-8][0-9]|409[0-5])\-([1-9]|[1-9][0-9]{1,3}|40[0-8][0-9]|409[0-6])$`) + if exp.MatchString(filter) { + return parsePartitionFilterByRange(filter) + } + + // Id 1456 + exp = regexp.MustCompile(`^\d+$`) + if exp.MatchString(filter) { + return parsePartitionFilterByID(filter) + } + + // Digest (base64 string) + exp = regexp.MustCompile(`^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$`) + if exp.MatchString(filter) { + return parsePartitionFilterByDigest(namespace, filter) + } + + return nil, fmt.Errorf("failed to parse partition filter: %s", filter) +} + +func parsePartitionFilterByRange(filter string) (*aerospike.PartitionFilter, error) { + bounds := strings.Split(filter, "-") + if len(bounds) != 2 { + return nil, fmt.Errorf("invalid partition filter: %s", filter) + } + + begin, err := strconv.Atoi(bounds[0]) + if err != nil { + return nil, fmt.Errorf("invalid partition filter %s begin value:%w", filter, err) + } + + count, err := strconv.Atoi(bounds[1]) + if err != nil { + return nil, fmt.Errorf("invalid partition filter %s count value:%w", filter, err) + } + + return backup.NewPartitionFilterByRange(begin, count), nil +} + +func parsePartitionFilterByID(filter string) (*aerospike.PartitionFilter, error) { + id, err := strconv.Atoi(filter) + if err != nil { + return nil, fmt.Errorf("invalid partition filter %s id value:%w", filter, err) + } + + return backup.NewPartitionFilterByID(id), nil +} + +func parsePartitionFilterByDigest(namespace, filter string) (*aerospike.PartitionFilter, error) { + return backup.NewPartitionFilterByDigest(namespace, filter) +} diff --git a/cmd/internal/app/configs_test.go b/cmd/internal/app/configs_test.go index 3412cb75..d5565a68 100644 --- a/cmd/internal/app/configs_test.go +++ b/cmd/internal/app/configs_test.go @@ -86,7 +86,6 @@ func TestMapBackupConfig_Success(t *testing.T) { assert.False(t, config.NoIndexes) assert.Equal(t, 1000, config.RecordsPerSecond) assert.Equal(t, int64(5000), config.FileLimit) - assert.Equal(t, "digest", config.AfterDigest) assert.Equal(t, true, config.NoTTLOnly) modBefore, _ := time.Parse("2006-01-02_15:04:05", "2023-09-01_12:00:00") diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index 918a2fc0..21c84cb2 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -98,6 +98,20 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { flagSet.BoolVar(&f.NoTTLOnly, "no-ttl-only", false, "Only include records that have no ttl set (persistent records).") + flagSet.StringVarP(&f.PartitionList, "partition-list", "X", + "", + "List of partitions [...]]> to back up. Partition filters can be ranges,\n"+ + "individual partitions, or records after a specific digest within a single partition.\n"+ + "This argument is mutually exclusive to after-digest.\n"+ + "Note: each partition filter is an individual task which cannot be parallelized, so you can only\n"+ + "achieve as much parallelism as there are partition filters. You may increase parallelism by dividing up\n"+ + "partition ranges manually.\n"+ + "Filter: [-]|\n"+ + "begin partition: 0-4095\n"+ + "partition count: 1-4096 Default: 1\n"+ + "digest: base64 encoded string\n"+ + "Examples: 0-1000, 1000-1000, 2222, EjRWeJq83vEjRRI0VniavN7xI0U=\n"+ + "Default: 0-4096 (all partitions)\n") return flagSet } diff --git a/cmd/internal/flags/backup_test.go b/cmd/internal/flags/backup_test.go index a48b18ab..9466f2f7 100644 --- a/cmd/internal/flags/backup_test.go +++ b/cmd/internal/flags/backup_test.go @@ -41,6 +41,7 @@ func TestBackup_NewFlagSet(t *testing.T) { "--compact", "--node-list", "node1,node2", "--no-ttl-only", + "--partition-list", "4000,1-236,EjRWeJq83vEjRRI0VniavN7xI0U=", } err := flagSet.Parse(args) @@ -62,6 +63,7 @@ func TestBackup_NewFlagSet(t *testing.T) { assert.Equal(t, true, result.Compact, "The compact flag should be parsed correctly") assert.Equal(t, "node1,node2", result.NodeList, "The node-list flag should be parsed correctly") assert.Equal(t, true, result.NoTTLOnly, "The no-ttl-only flag should be parsed correctly") + assert.Equal(t, "4000,1-236,EjRWeJq83vEjRRI0VniavN7xI0U=", result.PartitionList, "The partition-list flag should be parsed correctly") } func TestBackup_NewFlagSet_DefaultValues(t *testing.T) { @@ -89,4 +91,5 @@ func TestBackup_NewFlagSet_DefaultValues(t *testing.T) { assert.Equal(t, false, result.Compact, "The default value for compact should be false") assert.Equal(t, "", result.NodeList, "The default value for node-list should be empty") assert.Equal(t, false, result.NoTTLOnly, "The default value for no-ttl-only should be false") + assert.Equal(t, "", result.PartitionList, "The default value for partition-list should be empty string") } diff --git a/cmd/internal/models/backup.go b/cmd/internal/models/backup.go index 9e8bcc1f..362dc1f7 100644 --- a/cmd/internal/models/backup.go +++ b/cmd/internal/models/backup.go @@ -30,6 +30,7 @@ type Backup struct { Compact bool NodeList string NoTTLOnly bool + PartitionList string } // ShouldClearTarget check if we should clean target directory. diff --git a/config_backup.go b/config_backup.go index 7f25867f..a45ecc7e 100644 --- a/config_backup.go +++ b/config_backup.go @@ -15,7 +15,6 @@ package backup import ( - "encoding/base64" "fmt" "time" @@ -40,6 +39,25 @@ type BackupConfig struct { CompressionPolicy *CompressionPolicy // Secret agent config. SecretAgentConfig *SecretAgentConfig + // PartitionFilters specifies the Aerospike partitions to back up. + // Partition filters can be ranges, individual partitions, + // or records after a specific digest within a single partition. + // Note: + // if not default partition filter NewPartitionFilterAll() is used, + // each partition filter is an individual task which cannot be parallelized, + // so you can only achieve as much parallelism as there are partition filters. + // You may increase parallelism by dividing up partition ranges manually. + // AfterDigest: + // afterDigest filter can be applied with + // NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error) + // Backup records after record digest in record's partition plus all succeeding partitions. + // Used to resume backup with last record received from previous incomplete backup. + // This parameter will overwrite PartitionFilters.Begin value. + // Can't be used in full backup mode. + // This parameter is mutually exclusive to partition-list (not implemented). + // Format: base64 encoded string. + // Example: EjRWeJq83vEjRRI0VniavN7xI0U= + PartitionFilters []*a.PartitionFilter // Namespace is the Aerospike namespace to back up. Namespace string // NodeList contains a list of nodes to back up. @@ -55,11 +73,9 @@ type BackupConfig struct { // The list of backup bin names // (optional, given an empty list, all bins will be backed up) BinList []string - // Partitions specifies the Aerospike partitions to back up. - Partitions PartitionRange // ParallelNodes specifies how to perform scan. // If set to true, we launch parallel workers for nodes; otherwise workers run in parallel for partitions. - // Excludes Partitions param. + // Excludes PartitionFilters param. ParallelNodes bool // EncoderType describes an Encoder type that will be used on backing up. // Default `EncoderTypeASB` = 0. @@ -83,14 +99,6 @@ type BackupConfig struct { // File size limit (in bytes) for the backup. If a backup file exceeds this // size threshold, a new file will be created. 0 for no file size limit. FileLimit int64 - // Backup records after record digest in record's partition plus all succeeding partitions. - // Used to resume backup with last record received from previous incomplete backup. - // This parameter will overwrite Partitions.Begin value. - // Can't be used in full backup mode. - // This parameter is mutually exclusive to partition-list (not implemented). - // Format: base64 encoded string. - // Example: EjRWeJq83vEjRRI0VniavN7xI0U= - AfterDigest string // Do not apply base-64 encoding to BLOBs: Bytes, HLL, RawMap, RawList. // Results in smaller backup files. Compact bool @@ -98,57 +106,71 @@ type BackupConfig struct { NoTTLOnly bool } -// PartitionRange specifies a range of Aerospike partitions. -type PartitionRange struct { - Begin int - Count int +// NewPartitionFilterByRange returns a partition range with boundaries specified by the +// provided values. +func NewPartitionFilterByRange(begin, count int) *a.PartitionFilter { + return a.NewPartitionFilterByRange(begin, count) } -// NewPartitionRange returns a partition range with boundaries specified by the -// provided values. -func NewPartitionRange(begin, count int) PartitionRange { - return PartitionRange{begin, count} +// NewPartitionFilterByID returns a partition filter by id with specified id. +func NewPartitionFilterByID(partitionID int) *a.PartitionFilter { + return a.NewPartitionFilterById(partitionID) } -func (p PartitionRange) validate() error { - if p.Begin < 0 || p.Begin >= MaxPartitions { - return fmt.Errorf("begin must be between 0 and %d, got %d", MaxPartitions-1, p.Begin) +// NewPartitionFilterByDigest returns a partition filter by digest with specified value. +func NewPartitionFilterByDigest(namespace, digest string) (*a.PartitionFilter, error) { + key, err := getAsKeyByDigest(namespace, digest) + if err != nil { + return nil, err } - if p.Count < 1 || p.Count > MaxPartitions { - return fmt.Errorf("count must be between 1 and %d, got %d", MaxPartitions, p.Count) - } + return a.NewPartitionFilterByKey(key), nil +} - if p.Begin+p.Count > MaxPartitions { - return fmt.Errorf("begin + count is greater than the max partitions count of %d", - MaxPartitions) +// NewPartitionFilterAfterDigest returns partition filter to scan call records after digest. +func NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error) { + key, err := getAsKeyByDigest(namespace, digest) + if err != nil { + return nil, err } - return nil + defaultFilter := NewPartitionFilterAll() + begin := key.PartitionId() + count := defaultFilter.Count - begin + + return &a.PartitionFilter{ + Begin: begin, + Count: count, + Digest: key.Digest(), + }, nil } -// PartitionRangeAll returns a partition range containing all partitions. -func PartitionRangeAll() PartitionRange { - return NewPartitionRange(0, MaxPartitions) +// NewPartitionFilterAll returns a partition range containing all partitions. +func NewPartitionFilterAll() *a.PartitionFilter { + return a.NewPartitionFilterByRange(0, MaxPartitions) } // NewDefaultBackupConfig returns a new BackupConfig with default values. func NewDefaultBackupConfig() *BackupConfig { return &BackupConfig{ - Partitions: PartitionRangeAll(), - ParallelRead: 1, - ParallelWrite: 1, - Namespace: "test", - EncoderType: EncoderTypeASB, + PartitionFilters: []*a.PartitionFilter{NewPartitionFilterAll()}, + ParallelRead: 1, + ParallelWrite: 1, + Namespace: "test", + EncoderType: EncoderTypeASB, } } +// isAfterDigest checks if afterDigest filter is set. +func (c *BackupConfig) isAfterDigest() bool { + return len(c.PartitionFilters) == 1 && c.PartitionFilters[0].Digest != nil && c.PartitionFilters[0].Count > 1 +} + func (c *BackupConfig) isFullBackup() bool { // full backup doesn't have lower bound - return c.ModAfter == nil && c.AfterDigest == "" + return c.ModAfter == nil && !c.isAfterDigest() } -//nolint:gocyclo // Long validation function with a lot of checks. func (c *BackupConfig) validate() error { if c.ParallelRead < MinParallel || c.ParallelRead > MaxParallel { return fmt.Errorf("parallel read must be between 1 and 1024, got %d", c.ParallelRead) @@ -162,26 +184,16 @@ func (c *BackupConfig) validate() error { return fmt.Errorf("modified before must be strictly greater than modified after") } - if (c.ParallelNodes || len(c.NodeList) != 0) && (c.Partitions.Begin != 0 || c.Partitions.Count != 0) { + if (c.ParallelNodes || len(c.NodeList) != 0) && len(c.PartitionFilters) > 0 { return fmt.Errorf("parallel by nodes and partitions and the same time not allowed") } - if !c.ParallelNodes && len(c.NodeList) == 0 { - if err := c.Partitions.validate(); err != nil { - return err - } - } - - if c.AfterDigest != "" { + if c.isAfterDigest() { if c.ParallelNodes || len(c.NodeList) != 0 { return fmt.Errorf("parallel by nodes/node list and after digest at the same time not allowed") } - if _, err := base64.StdEncoding.DecodeString(c.AfterDigest); err != nil { - return fmt.Errorf("after digest must be base64 encoded string: %w", err) - } - - if c.Partitions.Begin != 0 { + if len(c.PartitionFilters) > 0 { return fmt.Errorf("after digest is set, begin partiotion can't be set") } } diff --git a/config_test.go b/config_test.go index 74661234..bd8362a3 100644 --- a/config_test.go +++ b/config_test.go @@ -41,10 +41,6 @@ func TestBackupConfig_validate(t *testing.T) { assert.ErrorContains(t, config.validate(), "modified before") config = NewDefaultBackupConfig() - config.Partitions = NewPartitionRange(-1, -1) - assert.ErrorContains(t, config.validate(), "begin must be between") - config = NewDefaultBackupConfig() - config.RecordsPerSecond = -1 assert.ErrorContains(t, config.validate(), "rps") config = NewDefaultBackupConfig() @@ -61,16 +57,6 @@ func TestBackupConfig_validate(t *testing.T) { assert.ErrorContains(t, config.validate(), "filelimit") config = NewDefaultBackupConfig() - config.AfterDigest = "te/&st" - assert.ErrorContains(t, config.validate(), "after digest") - config = NewDefaultBackupConfig() - - config.AfterDigest = "EjRWeJq83vEjRRI0VniavN7xI0U=" - config.Partitions.Begin = 2 - config.Partitions.Count = 10 - assert.ErrorContains(t, config.validate(), "after digest") - config = NewDefaultBackupConfig() - config.CompressionPolicy = &CompressionPolicy{Level: -1} assert.ErrorContains(t, config.validate(), "compression") config = NewDefaultBackupConfig() diff --git a/handler_backup.go b/handler_backup.go index 269f554c..1512438a 100644 --- a/handler_backup.go +++ b/handler_backup.go @@ -140,10 +140,7 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error { writeWorkers := bh.makeWriteWorkers(backupWriters) - handler, err := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter) - if err != nil { - return err - } + handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter) bh.stats.TotalRecords, err = handler.countRecords(ctx, bh.infoClient) if err != nil { diff --git a/handler_backup_records.go b/handler_backup_records.go index e5338ee8..1dc4be0e 100644 --- a/handler_backup_records.go +++ b/handler_backup_records.go @@ -16,7 +16,6 @@ package backup import ( "context" - "encoding/base64" "fmt" "io" "log/slog" @@ -45,7 +44,7 @@ func newBackupRecordsHandler( ac AerospikeClient, logger *slog.Logger, scanLimiter *semaphore.Weighted, -) (*backupRecordsHandler, error) { +) *backupRecordsHandler { logger.Debug("created new backup records handler") h := &backupRecordsHandler{ @@ -55,14 +54,7 @@ func newBackupRecordsHandler( scanLimiter: scanLimiter, } - // For resuming backup from config.AfterDigest, we have to get and check key info, then set additional params. - if config.AfterDigest != "" { - if err := h.setAfterDigest(); err != nil { - return nil, err - } - } - - return h, nil + return h } func (bh *backupRecordsHandler) run( @@ -99,23 +91,25 @@ func (bh *backupRecordsHandler) countRecordsUsingScan(ctx context.Context) (uint scanPolicy.IncludeBinData = false scanPolicy.MaxRecords = 0 - readerConfig := bh.recordReaderConfigForPartitions(bh.config.Partitions, &scanPolicy, bh.afterDigest) - recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger) + var count uint64 - defer recordReader.Close() + for i := range bh.config.PartitionFilters { + readerConfig := bh.recordReaderConfigForPartitions(bh.config.PartitionFilters[i], &scanPolicy) + recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger) - var count uint64 + for { + if _, err := recordReader.Read(); err != nil { + if err == io.EOF { + break + } - for { - if _, err := recordReader.Read(); err != nil { - if err == io.EOF { - break + return 0, fmt.Errorf("error during records counting: %w", err) } - return 0, fmt.Errorf("error during records counting: %w", err) + count++ } - count++ + recordReader.Close() } return count, nil @@ -141,10 +135,12 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkers( func (bh *backupRecordsHandler) makeAerospikeReadWorkersForPartition( ctx context.Context, n int, scanPolicy *a.ScanPolicy, ) ([]pipeline.Worker[*models.Token], error) { - partitionRanges, err := splitPartitions( - bh.config.Partitions.Begin, - bh.config.Partitions.Count, - n) + // If we have multiply partition filters, we shrink workers to number of filters. + if len(bh.config.PartitionFilters) > 1 { + n = len(bh.config.PartitionFilters) + } + + partitionGroups, err := splitPartitions(bh.config.PartitionFilters, n) if err != nil { return nil, err } @@ -152,11 +148,11 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkersForPartition( readWorkers := make([]pipeline.Worker[*models.Token], n) for i := 0; i < n; i++ { - recordReaderConfig := bh.recordReaderConfigForPartitions(partitionRanges[i], scanPolicy, nil) + recordReaderConfig := bh.recordReaderConfigForPartitions(partitionGroups[i], scanPolicy) // For the first partition in the list, we start from digest if it is set. if bh.afterDigest != nil && i == 0 { - recordReaderConfig = bh.recordReaderConfigForPartitions(partitionRanges[i], scanPolicy, bh.afterDigest) + recordReaderConfig = bh.recordReaderConfigForPartitions(partitionGroups[i], scanPolicy) } recordReader := aerospike.NewRecordReader( @@ -213,16 +209,9 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkersForNodes( } func (bh *backupRecordsHandler) recordReaderConfigForPartitions( - partitionRange PartitionRange, + partitionFilter *a.PartitionFilter, scanPolicy *a.ScanPolicy, - // We should pass digest as parameter here, for implementing adding digest to first partition logic. - digest []byte, ) *aerospike.RecordReaderConfig { - partitionFilter := a.NewPartitionFilterByRange(partitionRange.Begin, partitionRange.Count) - if digest != nil { - partitionFilter.Digest = digest - } - return aerospike.NewRecordReaderConfig( bh.config.Namespace, bh.config.SetList, @@ -258,21 +247,3 @@ func (bh *backupRecordsHandler) recordReaderConfigForNode( bh.config.NoTTLOnly, ) } - -func (bh *backupRecordsHandler) setAfterDigest() error { - digestBytes, err := base64.StdEncoding.DecodeString(bh.config.AfterDigest) - if err != nil { - return fmt.Errorf("failed to decode after-digest: %w", err) - } - - keyDigest, err := a.NewKeyWithDigest(bh.config.Namespace, "", "", digestBytes) - if err != nil { - return fmt.Errorf("failed to init key from digest: %w", err) - } - - bh.config.Partitions.Begin = keyDigest.PartitionId() - bh.config.Partitions.Count -= bh.config.Partitions.Begin - bh.afterDigest = keyDigest.Digest() - - return nil -} diff --git a/shared.go b/shared.go index 28941da7..6e4231ee 100644 --- a/shared.go +++ b/shared.go @@ -15,6 +15,7 @@ package backup import ( + "encoding/base64" "fmt" "log/slog" "runtime/debug" @@ -60,32 +61,43 @@ func doWork(errors chan<- error, logger *slog.Logger, work func() error) { logger.Info("job done") } -func splitPartitions(startPartition, numPartitions, numWorkers int) ([]PartitionRange, error) { - if startPartition+numPartitions > MaxPartitions { - return nil, fmt.Errorf("startPartition + numPartitions is greater than the max partitions: %d", - MaxPartitions) - } - +func splitPartitions(partitionFilters []*a.PartitionFilter, numWorkers int) ([]*a.PartitionFilter, error) { if numWorkers < 1 { - return nil, fmt.Errorf("numWorkers is less than 1, cannot split partitions") + return nil, fmt.Errorf("numWorkers is less than 1, cannot split partitionFilters") } - if numPartitions < 1 { - return nil, fmt.Errorf("numPartitions is less than 1, cannot split partitions") - } + result := make([]*a.PartitionFilter, numWorkers) + + // Validations. + for i := range partitionFilters { + if partitionFilters[i].Begin < 0 { + return nil, fmt.Errorf("startPartition is less than 0, cannot split partitionFilters") + } - if startPartition < 0 { - return nil, fmt.Errorf("startPartition is less than 0, cannot split partitions") + if partitionFilters[i].Count < 1 { + return nil, fmt.Errorf("numPartitions is less than 1, cannot split partitionFilters") + } + + if partitionFilters[i].Begin+partitionFilters[i].Count > MaxPartitions { + return nil, fmt.Errorf("startPartition + numPartitions is greater than the max partitionFilters: %d", + MaxPartitions) + } } - pSpecs := make([]PartitionRange, numWorkers) - for i := 0; i < numWorkers; i++ { - pSpecs[i].Begin = (i * numPartitions) / numWorkers - pSpecs[i].Count = (((i + 1) * numPartitions) / numWorkers) - pSpecs[i].Begin - pSpecs[i].Begin += startPartition + // If we have one partition filter with range. + // TODO: check if byDigest works here. + if len(partitionFilters) == 1 && partitionFilters[0].Count != 1 { + for j := 0; j < numWorkers; j++ { + result[j].Begin = (j * partitionFilters[j].Count) / numWorkers + result[j].Count = (((j + 1) * partitionFilters[j].Count) / numWorkers) - result[j].Begin + result[j].Begin += partitionFilters[j].Begin + } + + return result, nil } - return pSpecs, nil + // If we have more than one filter, we distribute them to workers 1=1. + return partitionFilters, nil } func splitNodes(nodes []*a.Node, numWorkers int) ([][]*a.Node, error) { @@ -141,3 +153,17 @@ func nodeToString(node *a.Node) string { return fmt.Sprintf("%s:%d", nodeHost.Name, nodeHost.Port) } + +func getAsKeyByDigest(namespace, digest string) (*a.Key, error) { + digestBytes, err := base64.StdEncoding.DecodeString(digest) + if err != nil { + return nil, fmt.Errorf("failed to decode after-digest: %w", err) + } + + key, err := a.NewKeyWithDigest(namespace, "", "", digestBytes) + if err != nil { + return nil, fmt.Errorf("failed to init key from digest: %w", err) + } + + return key, nil +} diff --git a/shared_test.go b/shared_test.go index a4483c0f..62440912 100644 --- a/shared_test.go +++ b/shared_test.go @@ -17,6 +17,8 @@ package backup import ( "reflect" "testing" + + a "github.com/aerospike/aerospike-client-go/v7" ) func Test_splitPartitions(t *testing.T) { @@ -27,7 +29,7 @@ func Test_splitPartitions(t *testing.T) { } tests := []struct { name string - want []PartitionRange + want []*a.PartitionFilter args args wantErr bool }{ @@ -38,7 +40,7 @@ func Test_splitPartitions(t *testing.T) { numPartitions: 10, numWorkers: 2, }, - want: []PartitionRange{ + want: []*a.PartitionFilter{ { Begin: 0, Count: 5, @@ -56,7 +58,7 @@ func Test_splitPartitions(t *testing.T) { numPartitions: 10, numWorkers: 2, }, - want: []PartitionRange{ + want: []*a.PartitionFilter{ { Begin: 5, Count: 5, @@ -74,7 +76,7 @@ func Test_splitPartitions(t *testing.T) { numPartitions: 20, numWorkers: 3, }, - want: []PartitionRange{ + want: []*a.PartitionFilter{ { Begin: 5, Count: 6, @@ -96,7 +98,7 @@ func Test_splitPartitions(t *testing.T) { numPartitions: 4096, numWorkers: 4, }, - want: []PartitionRange{ + want: []*a.PartitionFilter{ { Begin: 0, Count: 1024, @@ -122,7 +124,7 @@ func Test_splitPartitions(t *testing.T) { numPartitions: 4096, numWorkers: 1, }, - want: []PartitionRange{ + want: []*a.PartitionFilter{ { Begin: 0, Count: 4096, @@ -168,7 +170,8 @@ func Test_splitPartitions(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := splitPartitions(tt.args.startPartition, tt.args.numPartitions, tt.args.numWorkers) + partitionFilters := []*a.PartitionFilter{NewPartitionFilterByRange(tt.args.startPartition, tt.args.numPartitions)} + got, err := splitPartitions(partitionFilters, tt.args.numWorkers) if (err != nil) != tt.wantErr { t.Errorf("splitPartitions() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 52d194b1..606e8e49 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -324,12 +324,12 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreDirectory() { name: "with parallel backup", args: args{ backupConfig: &backup.BackupConfig{ - Partitions: backup.PartitionRangeAll(), - SetList: []string{suite.set}, - Namespace: suite.namespace, - ParallelRead: 1, - ParallelWrite: 100, - EncoderType: backup.EncoderTypeASB, + PartitionFilters: []*a.PartitionFilter{backup.NewPartitionFilterAll()}, + SetList: []string{suite.set}, + Namespace: suite.namespace, + ParallelRead: 1, + ParallelWrite: 100, + EncoderType: backup.EncoderTypeASB, }, restoreConfig: nonBatchRestore, bins: testBins, @@ -340,13 +340,13 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreDirectory() { name: "parallel with file size limit", args: args{ backupConfig: &backup.BackupConfig{ - Partitions: backup.PartitionRangeAll(), - SetList: []string{suite.set}, - Namespace: suite.namespace, - ParallelRead: 1, - ParallelWrite: 2, - EncoderType: backup.EncoderTypeASB, - FileLimit: 3 * 1024 * 1024, // 3mb, full backup ~9mb + PartitionFilters: []*a.PartitionFilter{backup.NewPartitionFilterAll()}, + SetList: []string{suite.set}, + Namespace: suite.namespace, + ParallelRead: 1, + ParallelWrite: 2, + EncoderType: backup.EncoderTypeASB, + FileLimit: 3 * 1024 * 1024, // 3mb, full backup ~9mb }, restoreConfig: nonBatchRestore, bins: testBins, @@ -526,7 +526,7 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreIOWithPartitions() { // backup half the partitions startPartition := 256 partitionCount := 2056 - partitions := backup.NewPartitionRange(startPartition, partitionCount) + partitions := []*a.PartitionFilter{backup.NewPartitionFilterByRange(startPartition, partitionCount)} // reset the expected record count numRec = 0 @@ -542,7 +542,7 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreIOWithPartitions() { ctx := context.Background() backupConfig := backup.NewDefaultBackupConfig() - backupConfig.Partitions = partitions + backupConfig.PartitionFilters = partitions backupDir := suite.T().TempDir() writers, err := local.NewWriter( @@ -743,7 +743,7 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreIOCompression() { func (suite *backupRestoreTestSuite) TestBackupParallelNodes() { bCfg := backup.NewDefaultBackupConfig() - bCfg.Partitions = backup.PartitionRange{} + bCfg.PartitionFilters = nil bCfg.ParallelNodes = true ctx := context.Background() @@ -761,7 +761,7 @@ func (suite *backupRestoreTestSuite) TestBackupParallelNodes() { func (suite *backupRestoreTestSuite) TestBackupParallelNodesList() { bCfg := backup.NewDefaultBackupConfig() - bCfg.Partitions = backup.PartitionRange{} + bCfg.PartitionFilters = nil bCfg.NodeList = []string{fmt.Sprintf("%s:%d", suite.aerospikeIP, suite.aerospikePort)} ctx := context.Background() @@ -940,13 +940,13 @@ func (suite *backupRestoreTestSuite) TestBinFilter() { }) var backupConfig = &backup.BackupConfig{ - Partitions: backup.PartitionRangeAll(), - SetList: []string{suite.set}, - Namespace: suite.namespace, - ParallelRead: 1, - ParallelWrite: 1, - EncoderType: backup.EncoderTypeASB, - BinList: []string{"BackupRestore", "OnlyBackup"}, + PartitionFilters: []*a.PartitionFilter{backup.NewPartitionFilterAll()}, + SetList: []string{suite.set}, + Namespace: suite.namespace, + ParallelRead: 1, + ParallelWrite: 1, + EncoderType: backup.EncoderTypeASB, + BinList: []string{"BackupRestore", "OnlyBackup"}, } var restoreConfig = backup.NewDefaultRestoreConfig() @@ -984,14 +984,14 @@ func (suite *backupRestoreTestSuite) TestFilterTimestamp() { var expectedRecords = tests.Subtract(batch2, batch3) var backupConfig = &backup.BackupConfig{ - Partitions: backup.PartitionRangeAll(), - SetList: []string{suite.set}, - Namespace: suite.namespace, - ParallelRead: 1, - ParallelWrite: 1, - EncoderType: backup.EncoderTypeASB, - ModAfter: &lowerLimit, - ModBefore: &upperLimit, + PartitionFilters: []*a.PartitionFilter{backup.NewPartitionFilterAll()}, + SetList: []string{suite.set}, + Namespace: suite.namespace, + ParallelRead: 1, + ParallelWrite: 1, + EncoderType: backup.EncoderTypeASB, + ModAfter: &lowerLimit, + ModBefore: &upperLimit, } var restoreConfig = backup.NewDefaultRestoreConfig() @@ -1011,12 +1011,12 @@ func (suite *backupRestoreTestSuite) TestRecordsPerSecond() { suite.SetupTest(records) var backupConfig = &backup.BackupConfig{ - Partitions: backup.PartitionRangeAll(), - SetList: []string{suite.set}, - Namespace: suite.namespace, - ParallelRead: 1, - ParallelWrite: 1, - EncoderType: backup.EncoderTypeASB, + PartitionFilters: []*a.PartitionFilter{backup.NewPartitionFilterAll()}, + SetList: []string{suite.set}, + Namespace: suite.namespace, + ParallelRead: 1, + ParallelWrite: 1, + EncoderType: backup.EncoderTypeASB, } backupConfig.ScanPolicy = suite.Aeroclient.DefaultScanPolicy backupConfig.ScanPolicy.RecordsPerSecond = rps @@ -1041,15 +1041,16 @@ func (suite *backupRestoreTestSuite) TestBackupAfterDigestOk() { suite.SetupTest(batch) digest := base64.StdEncoding.EncodeToString(batch[0].Key.Digest()) + afterDigestFilter, err := backup.NewPartitionFilterAfterDigest(suite.namespace, digest) + suite.Nil(err) var backupConfig = &backup.BackupConfig{ - Partitions: backup.PartitionRangeAll(), - SetList: []string{suite.set}, - Namespace: suite.namespace, - ParallelRead: 1, - ParallelWrite: 1, - EncoderType: backup.EncoderTypeASB, - AfterDigest: digest, + PartitionFilters: []*a.PartitionFilter{afterDigestFilter}, + SetList: []string{suite.set}, + Namespace: suite.namespace, + ParallelRead: 1, + ParallelWrite: 1, + EncoderType: backup.EncoderTypeASB, } ctx := context.Background()