diff --git a/cmd/asbackup/main.go b/cmd/asbackup/main.go index 62ed2102..43687dd1 100644 --- a/cmd/asbackup/main.go +++ b/cmd/asbackup/main.go @@ -176,7 +176,7 @@ func run(cmd *cobra.Command, _ []string) error { if err != nil { return err } - // Run app. + return asb.Run(cmd.Context()) } diff --git a/cmd/asbackup/readme.md b/cmd/asbackup/readme.md index 3876b858..3d7d812f 100644 --- a/cmd/asbackup/readme.md +++ b/cmd/asbackup/readme.md @@ -100,6 +100,7 @@ Backup Flags: --parallel-nodes Specifies how to perform scan. If set to true, we launch parallel workers for nodes; otherwise workers run in parallel for partitions. + --remove-artifacts Remove existing backup file (-o) or files (-d) without performing a backup. Compression Flags: -z, --compress string Enables compressing of backup files using the specified compression algorithm. @@ -146,9 +147,6 @@ Azure Flags: ## Unsupported flags ``` ---remove-artifacts Remove existing backup file (-o) or files (-d) without performing a backup. - This option is mutually exclusive to --continue and --estimate. - --continue Resumes an interrupted/failed backup from where it was left off, given the .state file that was generated from the interrupted/failed run. diff --git a/cmd/internal/app/asbackup.go b/cmd/internal/app/asbackup.go index 6c8a69a4..ef1081da 100644 --- a/cmd/internal/app/asbackup.go +++ b/cmd/internal/app/asbackup.go @@ -34,7 +34,6 @@ type ASBackup struct { writer backup.Writer } -//nolint:dupl // Code is very similar as NewASRestore but different. func NewASBackup( ctx context.Context, clientConfig *client.AerospikeConfig, @@ -52,6 +51,16 @@ func NewASBackup( return nil, err } + writer, err := getWriter(ctx, backupParams, commonParams, awsS3, gcpStorage, azureBlob) + if err != nil { + return nil, fmt.Errorf("failed to create backup writer: %w", err) + } + + if backupParams.RemoveArtifacts { + // We clean the folder on initialization. + return nil, nil + } + aerospikeClient, err := newAerospikeClient(clientConfig) if err != nil { return nil, fmt.Errorf("failed to create aerospike client: %w", err) @@ -73,11 +82,6 @@ func NewASBackup( return nil, fmt.Errorf("failed to create backup client: %w", err) } - writer, err := getWriter(ctx, backupParams, commonParams, awsS3, gcpStorage, azureBlob) - if err != nil { - return nil, fmt.Errorf("failed to create backup writer: %w", err) - } - return &ASBackup{ backupClient: backupClient, backupConfig: backupConfig, @@ -86,6 +90,10 @@ func NewASBackup( } func (b *ASBackup) Run(ctx context.Context) error { + if b == nil { + return nil + } + h, err := b.backupClient.Backup(ctx, b.backupConfig, b.writer) if err != nil { return fmt.Errorf("failed to start backup: %w", err) @@ -116,7 +124,7 @@ func getWriter( case azureBlob.ContainerName != "": return newAzureWriter(ctx, azureBlob, backupParams, commonParams) default: - return newLocalWriter(backupParams, commonParams) + return newLocalWriter(ctx, backupParams, commonParams) } } diff --git a/cmd/internal/app/asrestore.go b/cmd/internal/app/asrestore.go index 66583379..cb18ff49 100644 --- a/cmd/internal/app/asrestore.go +++ b/cmd/internal/app/asrestore.go @@ -34,7 +34,6 @@ type ASRestore struct { reader backup.StreamingReader } -//nolint:dupl // Code is very similar as NewASBackup but different. func NewASRestore( ctx context.Context, clientConfig *client.AerospikeConfig, @@ -52,6 +51,11 @@ func NewASRestore( return nil, err } + reader, err := getReader(ctx, restoreParams, commonParams, awsS3, gcpStorage, azureBlob) + if err != nil { + return nil, fmt.Errorf("failed to create backup reader: %w", err) + } + aerospikeClient, err := newAerospikeClient(clientConfig) if err != nil { return nil, fmt.Errorf("failed to create aerospike client: %w", err) @@ -73,11 +77,6 @@ func NewASRestore( return nil, fmt.Errorf("failed to create backup client: %w", err) } - reader, err := getReader(ctx, restoreParams, commonParams, awsS3, gcpStorage, azureBlob) - if err != nil { - return nil, fmt.Errorf("failed to create backup reader: %w", err) - } - return &ASRestore{ backupClient: backupClient, restoreConfig: restoreConfig, @@ -86,6 +85,10 @@ func NewASRestore( } func (r *ASRestore) Run(ctx context.Context) error { + if r == nil { + return nil + } + h, err := r.backupClient.Restore(ctx, r.restoreConfig, r.reader) if err != nil { return fmt.Errorf("failed to start restore: %w", err) diff --git a/cmd/internal/app/clients_test.go b/cmd/internal/app/clients_test.go index 725f8709..a8dd88dd 100644 --- a/cmd/internal/app/clients_test.go +++ b/cmd/internal/app/clients_test.go @@ -19,6 +19,7 @@ import ( "github.com/aerospike/backup-go/cmd/internal/models" "github.com/aerospike/tools-common-go/client" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" ) @@ -73,6 +74,8 @@ func TestClients_newAerospikeClient(t *testing.T) { func TestClients_newS3Client(t *testing.T) { t.Parallel() + err := createAwsCredentials() + assert.NoError(t, err) cfg := &models.AwsS3{ Region: testS3Region, @@ -81,7 +84,7 @@ func TestClients_newS3Client(t *testing.T) { } ctx := context.Background() - _, err := newS3Client(ctx, cfg) + _, err = newS3Client(ctx, cfg) require.NoError(t, err) } diff --git a/cmd/internal/app/writers.go b/cmd/internal/app/writers.go index 8ef64502..92051dad 100644 --- a/cmd/internal/app/writers.go +++ b/cmd/internal/app/writers.go @@ -27,7 +27,7 @@ import ( "github.com/aerospike/backup-go/io/local" ) -func newLocalWriter(b *models.Backup, c *models.Common) (backup.Writer, error) { +func newLocalWriter(ctx context.Context, b *models.Backup, c *models.Common) (backup.Writer, error) { var opts []local.Opt if c.Directory != "" && b.OutputFile == "" { @@ -38,11 +38,13 @@ func newLocalWriter(b *models.Backup, c *models.Common) (backup.Writer, error) { opts = append(opts, local.WithFile(b.OutputFile)) } - if b.RemoveFiles { + if b.ShouldClearTarget() { opts = append(opts, local.WithRemoveFiles()) } - return local.NewWriter(opts...) + opts = append(opts, local.WithValidator(asb.NewValidator())) + + return local.NewWriter(ctx, opts...) } func newS3Writer( @@ -70,10 +72,12 @@ func newS3Writer( opts = append(opts, s3.WithFile(path)) } - if b.RemoveFiles { + if b.ShouldClearTarget() { opts = append(opts, s3.WithRemoveFiles()) } + opts = append(opts, s3.WithValidator(asb.NewValidator())) + return s3.NewWriter(ctx, client, bucketName, opts...) } @@ -98,10 +102,12 @@ func newGcpWriter( opts = append(opts, storage.WithFile(b.OutputFile)) } - if b.RemoveFiles { + if b.ShouldClearTarget() { opts = append(opts, storage.WithRemoveFiles()) } + opts = append(opts, storage.WithValidator(asb.NewValidator())) + return storage.NewWriter(ctx, client, g.BucketName, opts...) } @@ -126,10 +132,12 @@ func newAzureWriter( opts = append(opts, blob.WithFile(b.OutputFile)) } - if b.RemoveFiles { + if b.ShouldClearTarget() { opts = append(opts, blob.WithRemoveFiles()) } + opts = append(opts, blob.WithValidator(asb.NewValidator())) + return blob.NewWriter(ctx, client, a.ContainerName, opts...) } @@ -138,10 +146,13 @@ func newAzureWriter( func getBucketFromPath(path string) (bucket, cleanPath string) { parts := strings.Split(path, "/") if len(parts) < 2 { - return "", path + return path, "/" } cleanPath = strings.Join(parts[1:], "/") + if cleanPath == "" { + cleanPath = "/" + } return parts[0], cleanPath } diff --git a/cmd/internal/app/writers_test.go b/cmd/internal/app/writers_test.go index 7692d073..98fd6d05 100644 --- a/cmd/internal/app/writers_test.go +++ b/cmd/internal/app/writers_test.go @@ -47,22 +47,50 @@ const ( ) func TestGetBucketFromPath(t *testing.T) { - t.Parallel() tests := []struct { - input string - expectedBucket string - expectedPath string + name string + path string + wantBucket string + wantCleanPath string }{ - {input: "bucket/path/to/file", expectedBucket: "bucket", expectedPath: "path/to/file"}, - {input: "single-part", expectedBucket: "", expectedPath: "single-part"}, - {input: "bucket/", expectedBucket: "bucket", expectedPath: ""}, - {input: "", expectedBucket: "", expectedPath: ""}, + { + name: "Single part path", + path: "bucketname", + wantBucket: "bucketname", + wantCleanPath: "/", + }, + { + name: "Path with bucket and folder", + path: "bucketname/folder", + wantBucket: "bucketname", + wantCleanPath: "folder", + }, + { + name: "Path with multiple folders", + path: "bucketname/folder/subfolder", + wantBucket: "bucketname", + wantCleanPath: "folder/subfolder", + }, + { + name: "Path with trailing slash", + path: "bucketname/", + wantBucket: "bucketname", + wantCleanPath: "/", + }, + { + name: "Empty path", + path: "", + wantBucket: "", + wantCleanPath: "/", + }, } - for _, test := range tests { - bucket, path := getBucketFromPath(test.input) - assert.Equal(t, test.expectedBucket, bucket) - assert.Equal(t, test.expectedPath, path) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bucket, cleanPath := getBucketFromPath(tt.path) + assert.Equal(t, tt.wantBucket, bucket) + assert.Equal(t, tt.wantCleanPath, cleanPath) + }) } } @@ -74,8 +102,8 @@ func TestNewLocalWriter(t *testing.T) { c := &models.Common{ Directory: t.TempDir(), } - - writer, err := newLocalWriter(b, c) + ctx := context.Background() + writer, err := newLocalWriter(ctx, b, c) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testLocalType, writer.GetType()) @@ -85,13 +113,13 @@ func TestNewLocalWriter(t *testing.T) { } c = &models.Common{} - writer, err = newLocalWriter(b, c) + writer, err = newLocalWriter(ctx, b, c) assert.NoError(t, err) assert.NotNil(t, writer) assert.Equal(t, testLocalType, writer.GetType()) b = &models.Backup{} - writer, err = newLocalWriter(b, c) + writer, err = newLocalWriter(ctx, b, c) assert.Error(t, err) assert.Nil(t, writer) } @@ -188,8 +216,7 @@ func TestGcpWriter(t *testing.T) { assert.Equal(t, testGcpType, writer.GetType()) b = &models.Backup{ - OutputFile: t.TempDir() + testFileName, - RemoveFiles: true, + OutputFile: t.TempDir() + testFileName, } c = &models.Common{} @@ -243,8 +270,7 @@ func TestAzureWriter(t *testing.T) { assert.Equal(t, testAzureType, writer.GetType()) b = &models.Backup{ - OutputFile: t.TempDir() + testFileName, - RemoveFiles: true, + OutputFile: t.TempDir() + testFileName, } c = &models.Common{} diff --git a/cmd/internal/flags/backup.go b/cmd/internal/flags/backup.go index e0cd86ec..3bf2337d 100644 --- a/cmd/internal/flags/backup.go +++ b/cmd/internal/flags/backup.go @@ -79,6 +79,11 @@ func (f *Backup) NewFlagSet() *pflag.FlagSet { false, "Specifies how to perform scan. If set to true, we launch parallel workers for nodes;\n"+ "otherwise workers run in parallel for partitions.") + // After implementing --continue and --estimate add this line here: + // "This option is mutually exclusive to --continue and --estimate." + flagSet.BoolVar(&f.RemoveArtifacts, "remove-artifacts", + false, + "Remove existing backup file (-o) or files (-d) without performing a backup.") return flagSet } diff --git a/cmd/internal/flags/backup_test.go b/cmd/internal/flags/backup_test.go index 52345833..7c047622 100644 --- a/cmd/internal/flags/backup_test.go +++ b/cmd/internal/flags/backup_test.go @@ -36,7 +36,8 @@ func TestBackup_NewFlagSet(t *testing.T) { "--no-bins", "--sleep-between-retries", "10", "--filter-exp", "encoded-filter-exp", - "--parallel-nodes", "true", + "--parallel-nodes", + "--remove-artifacts", } err := flagSet.Parse(args) @@ -54,6 +55,7 @@ func TestBackup_NewFlagSet(t *testing.T) { assert.Equal(t, 10, result.SleepBetweenRetries, "The sleep-between-retries flag should be parsed correctly") assert.Equal(t, "encoded-filter-exp", result.FilterExpression, "The filter-exp flag should be parsed correctly") assert.Equal(t, true, result.ParallelNodes, "The parallel-nodes flag should be parsed correctly") + assert.Equal(t, true, result.RemoveArtifacts, "The remove-artifacts flag should be parsed correctly") } func TestBackup_NewFlagSet_DefaultValues(t *testing.T) { @@ -77,4 +79,5 @@ func TestBackup_NewFlagSet_DefaultValues(t *testing.T) { assert.Equal(t, 5, result.SleepBetweenRetries, "The default value for sleep-between-retries should be 5") assert.Equal(t, "", result.FilterExpression, "The default value for filter-exp should be an empty string") assert.Equal(t, false, result.ParallelNodes, "The default value for parallel-nodes should be false") + assert.Equal(t, false, result.RemoveArtifacts, "The default value for remove-artifacts should be false") } diff --git a/cmd/internal/models/backup.go b/cmd/internal/models/backup.go index b8793be5..08cd46f4 100644 --- a/cmd/internal/models/backup.go +++ b/cmd/internal/models/backup.go @@ -26,4 +26,10 @@ type Backup struct { SleepBetweenRetries int FilterExpression string ParallelNodes bool + RemoveArtifacts bool +} + +// ShouldClearTarget check if we should clean target directory. +func (b *Backup) ShouldClearTarget() bool { + return b.RemoveFiles || b.RemoveArtifacts } diff --git a/examples/readme/main.go b/examples/readme/main.go index 11bd84e3..73646be9 100644 --- a/examples/readme/main.go +++ b/examples/readme/main.go @@ -35,8 +35,11 @@ func main() { panic(err) } + ctx := context.Background() + // For backup to single file use local.WithFile(fileName) writers, err := local.NewWriter( + ctx, local.WithRemoveFiles(), local.WithDir("backups_folder"), ) @@ -47,7 +50,6 @@ func main() { backupCfg := backup.NewDefaultBackupConfig() backupCfg.Namespace = "test" backupCfg.ParallelRead = 5 - ctx := context.Background() backupHandler, err := backupClient.Backup(ctx, backupCfg, writers) if err != nil { diff --git a/handler_backup.go b/handler_backup.go index bf9d4bb1..82f777a9 100644 --- a/handler_backup.go +++ b/handler_backup.go @@ -45,6 +45,8 @@ type Writer interface { NewWriter(ctx context.Context, filename string) (io.WriteCloser, error) // GetType returns the type of storage. Used in logging. GetType() string + // RemoveFiles removes a backup file or files from directory. + RemoveFiles(ctx context.Context) error } // BackupHandler handles a backup job. diff --git a/io/aws/s3/reader.go b/io/aws/s3/reader.go index 3e226b16..48a3a866 100644 --- a/io/aws/s3/reader.go +++ b/io/aws/s3/reader.go @@ -47,10 +47,14 @@ type options struct { path string // isDir flag describes what we have in path, file or directory. isDir bool - // removeFiles flag describes should we remove everything from backup folder or not. - removeFiles bool + // isRemovingFiles flag describes should we remove everything from backup folder or not. + isRemovingFiles bool // validator contains files validator that is applied to files if isDir = true. validator validator + // withNestedDir describes if we should check for if an object is a directory for read/write operations. + // When we stream files or delete files in folder, we skip directories. This flag will avoid skipping. + // Default: true + withNestedDir bool } type Opt func(*options) @@ -79,6 +83,13 @@ func WithValidator(v validator) Opt { } } +// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders. +func WithNestedDir() Opt { + return func(r *options) { + r.withNestedDir = true + } +} + // NewReader returns new S3 storage reader. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithValidator(v validator) - optional. @@ -163,7 +174,7 @@ func (r *Reader) streamDirectory( } for _, p := range listResponse.Contents { - if p.Key == nil { + if p.Key == nil || isDirectory(r.prefix, *p.Key) && !r.withNestedDir { continue } diff --git a/io/aws/s3/writer.go b/io/aws/s3/writer.go index 4433145a..d005a8e5 100644 --- a/io/aws/s3/writer.go +++ b/io/aws/s3/writer.go @@ -49,7 +49,7 @@ type Writer struct { // Is used only for Writer. func WithRemoveFiles() Opt { return func(r *options) { - r.removeFiles = true + r.isRemovingFiles = true } } @@ -76,14 +76,20 @@ func NewWriter( } var prefix string + if w.isDir { - prefix = w.path + w.prefix = w.path // Protection from incorrect input. if !strings.HasSuffix(w.path, "/") && w.path != "/" && w.path != "" { prefix = fmt.Sprintf("%s/", w.path) } } + // For s3 we should use empty prefix for root. + if w.prefix == "/" { + w.prefix = "" + } + // Check if the bucket exists and we have permissions. _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{ Bucket: aws.String(bucketName), @@ -93,23 +99,24 @@ func NewWriter( } // Check if backup dir is empty. - isEmpty, err := isEmptyDirectory(ctx, client, bucketName, prefix) + isEmpty, err := isEmptyDirectory(ctx, client, bucketName, w.prefix) if err != nil { return nil, fmt.Errorf("failed to check if the directory is empty: %w", err) } - if !isEmpty && !w.removeFiles { - return nil, fmt.Errorf("backup folder must be empty or set removeFiles = true") - } - - err = deleteAllFilesUnderPrefix(ctx, client, bucketName, prefix) - if err != nil { - return nil, fmt.Errorf("failed to delete files under prefix %s: %w", prefix, err) + if !isEmpty && !w.isRemovingFiles { + return nil, fmt.Errorf("backup folder must be empty or set RemoveFiles = true") } w.client = client w.bucketName = bucketName - w.prefix = prefix + + if w.isRemovingFiles { + err = w.RemoveFiles(ctx) + if err != nil { + return nil, fmt.Errorf("failed to delete files under prefix %s: %w", prefix, err) + } + } return w, nil } @@ -243,11 +250,6 @@ func (w *s3Writer) Close() error { } func isEmptyDirectory(ctx context.Context, client *s3.Client, bucketName, prefix string) (bool, error) { - // S3 storage can write to "/" prefix, but can't read from "/", so we should replace it with "". - if prefix == "/" { - prefix = "" - } - resp, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ Bucket: &bucketName, Prefix: &prefix, @@ -265,27 +267,46 @@ func isEmptyDirectory(ctx context.Context, client *s3.Client, bucketName, prefix return len(resp.Contents) == 0, nil } -func deleteAllFilesUnderPrefix(ctx context.Context, client *s3.Client, bucketName, prefix string) error { +// RemoveFiles removes a backup file or files from directory. +func (w *Writer) RemoveFiles(ctx context.Context) error { + // Remove file. + if !w.isDir { + if _, err := w.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(w.bucketName), + Key: aws.String(w.path), + }); err != nil { + return fmt.Errorf("failed to delete object %s: %w", w.path, err) + } + + return nil + } + // Remove files from dir. var continuationToken *string for { - listResponse, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ - Bucket: &bucketName, - Prefix: &prefix, + listResponse, err := w.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(w.bucketName), + Prefix: aws.String(w.prefix), ContinuationToken: continuationToken, }) - if err != nil { return fmt.Errorf("failed to list objects: %w", err) } for _, p := range listResponse.Contents { - if p.Key == nil { + if p.Key == nil || isDirectory(w.prefix, *p.Key) && !w.withNestedDir { continue } - _, err = client.DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: &bucketName, + // If validator is set, remove only valid files. + if w.validator != nil { + if err = w.validator.Run(*p.Key); err != nil { + continue + } + } + + _, err = w.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: &w.bucketName, Key: p.Key, }) if err != nil { @@ -301,3 +322,18 @@ func deleteAllFilesUnderPrefix(ctx context.Context, client *s3.Client, bucketNam return nil } + +func isDirectory(prefix, fileName string) bool { + // If file name ends with / it is 100% dir. + if strings.HasSuffix(fileName, "/") { + return true + } + + // If we look inside some folder. + if strings.HasPrefix(fileName, prefix) { + clean := strings.TrimPrefix(fileName, prefix+"/") + return strings.Contains(clean, "/") + } + // All other variants. + return strings.Contains(fileName, "/") +} diff --git a/io/azure/blob/reader.go b/io/azure/blob/reader.go index 84ef19c1..cbae4361 100644 --- a/io/azure/blob/reader.go +++ b/io/azure/blob/reader.go @@ -46,13 +46,17 @@ type options struct { path string // isDir flag describes what we have in path, file or directory. isDir bool - // removeFiles flag describes should we remove everything from backup folder or not. - removeFiles bool + // isRemovingFiles flag describes should we remove everything from backup folder or not. + isRemovingFiles bool // validator contains files validator that is applied to files if isDir = true. validator validator // Concurrency defines the max number of concurrent uploads to be performed to upload the file. // Each concurrent upload will create a buffer of size BlockSize. uploadConcurrency int + // withNestedDir describes if we should check for if an object is a directory for read/write operations. + // When we stream files or delete files in folder, we skip directories. This flag will avoid skipping. + // Default: true + withNestedDir bool } type Opt func(*options) @@ -70,7 +74,6 @@ func WithFile(path string) Opt { return func(r *options) { r.path = path r.isDir = false - r.removeFiles = true } } @@ -82,6 +85,13 @@ func WithValidator(v validator) Opt { } } +// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders. +func WithNestedDir() Opt { + return func(r *options) { + r.withNestedDir = true + } +} + // NewReader returns new Azure blob directory/file reader. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithValidator(v validator) - optional. @@ -153,7 +163,7 @@ func (r *Reader) streamDirectory( for _, blob := range page.Segment.BlobItems { // Skip files in folders. - if isDirectory(r.prefix, *blob.Name) { + if isDirectory(r.prefix, *blob.Name) && !r.withNestedDir { continue } diff --git a/io/azure/blob/reader_writer_test.go b/io/azure/blob/reader_writer_test.go index 6ddb34e0..2e0a2c9d 100644 --- a/io/azure/blob/reader_writer_test.go +++ b/io/azure/blob/reader_writer_test.go @@ -411,7 +411,7 @@ func (s *AzureSuite) TestWriter_WriteNotEmptyDirError() { testContainerName, WithDir(testWriteFolderWithDataError), ) - s.Require().ErrorContains(err, "backup folder must be empty or set removeFiles = true") + s.Require().ErrorContains(err, "backup folder must be empty or set RemoveFiles = true") } func (s *AzureSuite) TestWriter_WriteNotEmptyDir() { diff --git a/io/azure/blob/writer.go b/io/azure/blob/writer.go index d393d100..5819a9fa 100644 --- a/io/azure/blob/writer.go +++ b/io/azure/blob/writer.go @@ -50,7 +50,7 @@ type Writer struct { // Is used only for Writer. func WithRemoveFiles() Opt { return func(r *options) { - r.removeFiles = true + r.isRemovingFiles = true } } @@ -102,18 +102,20 @@ func NewWriter( return nil, fmt.Errorf("failed to check if directory is empty: %w", err) } - if !isEmpty && !w.removeFiles { - return nil, fmt.Errorf("backup folder must be empty or set removeFiles = true") - } - - // As we accept only empty dir or dir with files for removing. We can remove them even in an empty bucket. - if err = removeFilesFromFolder(ctx, client, containerName, prefix); err != nil { - return nil, fmt.Errorf("failed to remove files from folder: %w", err) + if !isEmpty && !w.isRemovingFiles && w.isDir { + return nil, fmt.Errorf("backup folder must be empty or set RemoveFiles = true") } w.containerName = containerName w.prefix = prefix + if w.isRemovingFiles { + // As we accept only empty dir or dir with files for removing. We can remove them even in an empty bucket. + if err = w.RemoveFiles(ctx); err != nil { + return nil, fmt.Errorf("failed to remove files from folder: %w", err) + } + } + return w, nil } @@ -218,9 +220,20 @@ func isEmptyDirectory(ctx context.Context, client *azblob.Client, containerName, return false, nil } -func removeFilesFromFolder(ctx context.Context, client *azblob.Client, containerName, prefix string) error { - pager := client.NewListBlobsFlatPager(containerName, &azblob.ListBlobsFlatOptions{ - Prefix: &prefix, +// RemoveFiles removes a backup file or files from directory. +func (w *Writer) RemoveFiles(ctx context.Context) error { + // Remove file. + if !w.isDir { + _, err := w.client.DeleteBlob(ctx, w.containerName, w.path, nil) + if err != nil { + return fmt.Errorf("failed to delete blob %s: %w", w.path, err) + } + + return nil + } + // Remove files from dir. + pager := w.client.NewListBlobsFlatPager(w.containerName, &azblob.ListBlobsFlatOptions{ + Prefix: &w.prefix, }) for pager.More() { @@ -231,11 +244,18 @@ func removeFilesFromFolder(ctx context.Context, client *azblob.Client, container for _, blob := range page.Segment.BlobItems { // Skip files in folders. - if isDirectory(prefix, *blob.Name) { + if isDirectory(w.prefix, *blob.Name) && !w.withNestedDir { continue } - _, err = client.DeleteBlob(ctx, containerName, *blob.Name, nil) + // If validator is set, remove only valid files. + if w.validator != nil { + if err = w.validator.Run(*blob.Name); err != nil { + continue + } + } + + _, err = w.client.DeleteBlob(ctx, w.containerName, *blob.Name, nil) if err != nil { return fmt.Errorf("failed to delete blob %s: %w", *blob.Name, err) } diff --git a/io/gcp/storage/reader.go b/io/gcp/storage/reader.go index 6464cfdc..019e4041 100644 --- a/io/gcp/storage/reader.go +++ b/io/gcp/storage/reader.go @@ -49,10 +49,14 @@ type options struct { path string // isDir flag describes what we have in path, file or directory. isDir bool - // removeFiles flag describes should we remove everything from backup folder or not. - removeFiles bool + // isRemovingFiles flag describes should we remove everything from backup folder or not. + isRemovingFiles bool // validator contains files validator that is applied to files if isDir = true. validator validator + // withNestedDir describes if we should check for if an object is a directory for read/write operations. + // When we stream files or delete files in folder, we skip directories. This flag will avoid skipping. + // Default: true + withNestedDir bool } type Opt func(*options) @@ -70,7 +74,6 @@ func WithFile(path string) Opt { return func(r *options) { r.path = path r.isDir = false - r.removeFiles = true } } @@ -82,6 +85,13 @@ func WithValidator(v validator) Opt { } } +// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders. +func WithNestedDir() Opt { + return func(r *options) { + r.withNestedDir = true + } +} + // NewReader returns new GCP storage directory/file reader. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithValidator(v validator) - optional. @@ -161,7 +171,7 @@ func (r *Reader) streamDirectory( } // Skip files in folders. - if isDirectory(r.prefix, objAttrs.Name) { + if isDirectory(r.prefix, objAttrs.Name) && !r.withNestedDir { continue } @@ -217,7 +227,13 @@ func isDirectory(prefix, fileName string) bool { // If we look inside some folder. if strings.HasPrefix(fileName, prefix) { + // For root folder we should add. + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + clean := strings.TrimPrefix(fileName, prefix) + return strings.Contains(clean, "/") } // All other variants. diff --git a/io/gcp/storage/reader_writer_test.go b/io/gcp/storage/reader_writer_test.go index 8426f708..7789788c 100644 --- a/io/gcp/storage/reader_writer_test.go +++ b/io/gcp/storage/reader_writer_test.go @@ -407,7 +407,7 @@ func (s *GCPSuite) TestWriter_WriteNotEmptyDirError() { testBucketName, WithDir(testWriteFolderWithDataError), ) - s.Require().ErrorContains(err, "backup folder must be empty or set removeFiles = true") + s.Require().ErrorContains(err, "backup folder must be empty or set RemoveFiles = true") } func (s *GCPSuite) TestWriter_WriteNotEmptyDir() { diff --git a/io/gcp/storage/writer.go b/io/gcp/storage/writer.go index c3ab547c..d9b4265a 100644 --- a/io/gcp/storage/writer.go +++ b/io/gcp/storage/writer.go @@ -35,6 +35,8 @@ const ( type Writer struct { // Optional parameters. options + // bucketName contains bucket name, is used for logging. + bucketName string // bucketHandle contains storage bucket handler for performing reading and writing operations. bucketHandle *storage.BucketHandle // prefix contains folder name if we have folders inside the bucket. @@ -47,7 +49,7 @@ type Writer struct { // Is used only for Writer. func WithRemoveFiles() Opt { return func(r *options) { - r.removeFiles = true + r.isRemovingFiles = true } } @@ -79,31 +81,33 @@ func NewWriter( } } - bucket := client.Bucket(bucketName) - // Check if bucket exists, to avoid errors. - _, err := bucket.Attrs(ctx) + bucketHandler := client.Bucket(bucketName) + // Check if bucketHandler exists, to avoid errors. + _, err := bucketHandler.Attrs(ctx) if err != nil { - return nil, fmt.Errorf("failed to get bucket %s attr: %w", bucketName, err) + return nil, fmt.Errorf("failed to get bucketHandler %s attr: %w", bucketName, err) } // Check if backup dir is empty. - isEmpty, err := isEmptyDirectory(ctx, bucket, prefix) + isEmpty, err := isEmptyDirectory(ctx, bucketHandler, prefix) if err != nil { return nil, fmt.Errorf("failed to check if directory is empty: %w", err) } - if !isEmpty && !w.removeFiles { - return nil, fmt.Errorf("backup folder must be empty or set removeFiles = true") + if !isEmpty && !w.isRemovingFiles && w.isDir { + return nil, fmt.Errorf("backup folder must be empty or set RemoveFiles = true") } - // As we accept only empty dir or dir with files for removing. We can remove them even in an empty bucket. - if err = removeFilesFromFolder(ctx, bucket, bucketName, prefix); err != nil { - return nil, fmt.Errorf("failed to remove files from folder: %w", err) - } - - w.bucketHandle = bucket + w.bucketHandle = bucketHandler w.prefix = prefix + if w.isRemovingFiles { + // As we accept only empty dir or dir with files for removing. We can remove them even in an empty bucketHandler. + if err = w.RemoveFiles(ctx); err != nil { + return nil, fmt.Errorf("failed to remove files from folder: %w", err) + } + } + return w, nil } @@ -126,14 +130,21 @@ func (w *Writer) NewWriter(ctx context.Context, filename string) (io.WriteCloser return sw, nil } -// GetType return `gcpStorageType` type of storage. Used in logging. -func (w *Writer) GetType() string { - return gcpStorageType -} +// RemoveFiles removes a backup file or files from directory. +func (w *Writer) RemoveFiles( + ctx context.Context, +) error { + // Remove file. + if !w.isDir { + if err := w.bucketHandle.Object(w.path).Delete(ctx); err != nil { + return fmt.Errorf("failed to delete object %s: %w", w.path, err) + } -func isEmptyDirectory(ctx context.Context, bucketHandle *storage.BucketHandle, prefix string) (bool, error) { - it := bucketHandle.Objects(ctx, &storage.Query{ - Prefix: prefix, + return nil + } + // Remove files from dir. + it := w.bucketHandle.Objects(ctx, &storage.Query{ + Prefix: w.path, }) for { @@ -144,21 +155,35 @@ func isEmptyDirectory(ctx context.Context, bucketHandle *storage.BucketHandle, p } if err != nil { - return false, fmt.Errorf("failed to list bucket objects: %w", err) + return fmt.Errorf("failed to read object attr from bucket %s: %w", w.bucketName, err) } // Skip files in folders. - if isDirectory(prefix, objAttrs.Name) { + if isDirectory(w.path, objAttrs.Name) && !w.withNestedDir { continue } - return false, nil + // If validator is set, remove only valid files. + if w.validator != nil { + if err = w.validator.Run(objAttrs.Name); err != nil { + continue + } + } + + if err = w.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil { + return fmt.Errorf("failed to delete object %s: %w", objAttrs.Name, err) + } } - return true, nil + return nil +} + +// GetType return `gcpStorageType` type of storage. Used in logging. +func (w *Writer) GetType() string { + return gcpStorageType } -func removeFilesFromFolder(ctx context.Context, bucketHandle *storage.BucketHandle, bucketName, prefix string) error { +func isEmptyDirectory(ctx context.Context, bucketHandle *storage.BucketHandle, prefix string) (bool, error) { it := bucketHandle.Objects(ctx, &storage.Query{ Prefix: prefix, }) @@ -171,7 +196,7 @@ func removeFilesFromFolder(ctx context.Context, bucketHandle *storage.BucketHand } if err != nil { - return fmt.Errorf("failed to read object attr from bucket %s: %w", bucketName, err) + return false, fmt.Errorf("failed to list bucket objects: %w", err) } // Skip files in folders. @@ -179,10 +204,8 @@ func removeFilesFromFolder(ctx context.Context, bucketHandle *storage.BucketHand continue } - if err = bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil { - return fmt.Errorf("failed to delete object %s: %w", objAttrs.Name, err) - } + return false, nil } - return nil + return true, nil } diff --git a/io/local/reader.go b/io/local/reader.go index 72bb646a..d4fe3d00 100644 --- a/io/local/reader.go +++ b/io/local/reader.go @@ -43,10 +43,14 @@ type options struct { path string // isDir flag describes what we have in path, file or directory. isDir bool - // removeFiles flag describes should we remove everything from backup folder or not. - removeFiles bool + // isRemovingFiles flag describes should we remove everything from backup folder or not. + isRemovingFiles bool // validator contains files validator that is applied to files if isDir = true. validator validator + // withNestedDir describes if we should check for if an object is a directory for read/write operations. + // When we stream files or delete files in folder, we skip directories. This flag will avoid skipping. + // Default: true + withNestedDir bool } type Opt func(*options) @@ -75,6 +79,13 @@ func WithValidator(v validator) Opt { } } +// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders. +func WithNestedDir() Opt { + return func(r *options) { + r.withNestedDir = true + } +} + // NewReader creates a new local directory/file Reader. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithValidator(v validator) - optional. @@ -98,9 +109,17 @@ func NewReader(opts ...Opt) (*Reader, error) { func (r *Reader) StreamFiles( ctx context.Context, readersCh chan<- io.ReadCloser, errorsCh chan<- error, ) { + defer close(readersCh) // If it is a folder, open and return. if r.isDir { + err := r.checkRestoreDirectory() + if err != nil { + errorsCh <- err + return + } + r.streamDirectory(ctx, readersCh, errorsCh) + return } @@ -111,14 +130,6 @@ func (r *Reader) StreamFiles( func (r *Reader) streamDirectory( ctx context.Context, readersCh chan<- io.ReadCloser, errorsCh chan<- error, ) { - defer close(readersCh) - - err := r.checkRestoreDirectory() - if err != nil { - errorsCh <- err - return - } - fileInfo, err := os.ReadDir(r.path) if err != nil { errorsCh <- fmt.Errorf("failed to read path %s: %w", r.path, err) @@ -132,6 +143,18 @@ func (r *Reader) streamDirectory( } if file.IsDir() { + // Itterate over nested dirs recursively. + if r.withNestedDir { + nestedDir := filepath.Join(r.path, file.Name()) + + subReader, err := NewReader(WithDir(nestedDir), WithValidator(r.validator), WithNestedDir()) + if err != nil { + errorsCh <- fmt.Errorf("failed to read nested dir %s: %w", nestedDir, err) + } + + subReader.streamDirectory(ctx, readersCh, errorsCh) + } + continue } @@ -162,8 +185,6 @@ func (r *Reader) streamDirectory( // In case of an error, it is sent to the `errorsCh` channel. func (r *Reader) streamFile( ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) { - defer close(readersCh) - if ctx.Err() != nil { errorsCh <- ctx.Err() return diff --git a/io/local/reader_test.go b/io/local/reader_test.go index 90e960ec..42189eff 100644 --- a/io/local/reader_test.go +++ b/io/local/reader_test.go @@ -229,6 +229,16 @@ func createTmpFile(dir, fileName string) error { return nil } +func createTempNestedDir(rootPath, nestedDir string) error { + nestedPath := filepath.Join(rootPath, nestedDir) + if _, err := os.Stat(nestedPath); os.IsNotExist(err) { + if err = os.MkdirAll(nestedPath, os.ModePerm); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + } + return nil +} + func (s *checkRestoreDirectoryTestSuite) TestDirectoryReader_OpenFile() { const fileName = "oneFile.asb" @@ -292,3 +302,48 @@ func (s *checkRestoreDirectoryTestSuite) TestDirectoryReader_OpenFileErr() { } } } + +func (s *checkRestoreDirectoryTestSuite) TestDirectoryReader_StreamFiles_Nested_OK() { + dir := s.T().TempDir() + + err := createTempNestedDir(dir, "nested1") + require.NoError(s.T(), err) + err = createTmpFile(dir, "nested1/file1.asb") + require.NoError(s.T(), err) + err = createTempNestedDir(dir, "nested2") + require.NoError(s.T(), err) + err = createTmpFile(dir, "nested2/file2.asb") + require.NoError(s.T(), err) + err = createTmpFile(dir, "file3.txt") + require.NoError(s.T(), err) + + mockValidator := new(mocks.Mockvalidator) + mockValidator.On("Run", mock.AnythingOfType("string")).Return(func(fileName string) error { + if filepath.Ext(fileName) == ".asb" { + return nil + } + return fmt.Errorf("invalid file extension") + }) + + streamingReader, err := NewReader(WithValidator(mockValidator), WithDir(dir), WithNestedDir()) + s.Require().NoError(err) + + readerChan := make(chan io.ReadCloser) + errorChan := make(chan error) + go streamingReader.StreamFiles(context.Background(), readerChan, errorChan) + + var counter int + for { + select { + case _, ok := <-readerChan: + // if chan closed, we're done. + if !ok { + s.Require().Equal(2, counter) + return + } + counter++ + case err = <-errorChan: + require.NoError(s.T(), err) + } + } +} diff --git a/io/local/writer.go b/io/local/writer.go index df8e01e0..d74bc870 100644 --- a/io/local/writer.go +++ b/io/local/writer.go @@ -38,14 +38,14 @@ type Writer struct { // Is used only for Writer. func WithRemoveFiles() Opt { return func(r *options) { - r.removeFiles = true + r.isRemovingFiles = true } } // NewWriter creates a new writer for local directory/file writes. // Must be called with WithDir(path string) or WithFile(path string) - mandatory. // Can be called with WithRemoveFiles() - optional. -func NewWriter(opts ...Opt) (*Writer, error) { +func NewWriter(ctx context.Context, opts ...Opt) (*Writer, error) { w := &Writer{} for _, opt := range opts { @@ -56,23 +56,29 @@ func NewWriter(opts ...Opt) (*Writer, error) { return nil, fmt.Errorf("path is required, use WithDir(path string) or WithFile(path string) to set") } - var err error - // If we want to remove files from backup path. - if w.removeFiles { - switch w.isDir { - case true: - err = forcePrepareBackupDirectory(w.path) - case false: - err = removeFileIfExists(w.path) - } + err := prepareBackupDirectory(w.path, w.isDir) + if err != nil { + return nil, fmt.Errorf("failed to prepare backup directory: %w", err) } - if !w.removeFiles && w.isDir { - err = prepareBackupDirectory(w.path) + if w.isDir { + // Check if backup dir is empty. + isEmpty, err := isEmptyDirectory(w.path) + if err != nil { + return nil, fmt.Errorf("failed to check if directory is empty: %w", err) + } + + if !isEmpty && !w.isRemovingFiles { + return nil, fmt.Errorf("backup folder must be empty or set RemoveFiles = true") + } } - if err != nil { - return nil, err + // If we want to remove files from backup path. + if w.isRemovingFiles { + err := w.RemoveFiles(ctx) + if err != nil { + return nil, fmt.Errorf("failed to remove files: %w", err) + } } return w, nil @@ -80,64 +86,80 @@ func NewWriter(opts ...Opt) (*Writer, error) { // prepareBackupDirectory creates the backup directory if it does not exist. // It returns an error is the path already exits and it is not empty. -func prepareBackupDirectory(dir string) error { - dirInfo, err := os.Stat(dir) - if err != nil { - if os.IsNotExist(err) { - return makeDir(dir) - } - - return err +func prepareBackupDirectory(path string, isDir bool) error { + if !isDir { + path = filepath.Dir(path) } - if !dirInfo.IsDir() { - return fmt.Errorf("%s is not a directory", dir) + if _, err := os.Stat(path); os.IsNotExist(err) { + if err = os.MkdirAll(path, os.ModePerm); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } } - fileInfo, err := os.ReadDir(dir) + return nil +} + +func isEmptyDirectory(path string) (bool, error) { + fileInfo, err := os.ReadDir(path) if err != nil { - return fmt.Errorf("failed to read path %s: %w", dir, err) + return false, fmt.Errorf("failed to read path %s: %w", path, err) } if len(fileInfo) > 0 { - return fmt.Errorf("%s is not empty", dir) + return false, nil } - return nil + return true, nil } -// forcePrepareBackupDirectory removes any existing directory and its contents -// and creates a new directory. -func forcePrepareBackupDirectory(dir string) error { - err := os.RemoveAll(dir) - if err != nil { - return fmt.Errorf("failed to remove directory %s: %w", dir, err) +// RemoveFiles removes a backup file or files from directory. +func (w *Writer) RemoveFiles(ctx context.Context) error { + if ctx.Err() != nil { + return ctx.Err() } - return makeDir(dir) -} - -func removeFileIfExists(path string) error { - _, err := os.Stat(path) + info, err := os.Stat(w.path) switch { case err == nil: - if err = os.Remove(path); err != nil { - return fmt.Errorf("failed to remove %s: %w", path, err) - } + // ok. case os.IsNotExist(err): + // File doesn't exist, it's ok. return nil default: - return fmt.Errorf("failed to remove if exist %s: %w", path, err) + return fmt.Errorf("failed to stat path %s: %w", w.path, err) } + // if it is a file. + if !info.IsDir() { + if err = os.Remove(w.path); err != nil { + return fmt.Errorf("failed to remove file %s: %w", w.path, err) + } - return nil -} - -func makeDir(dir string) error { - err := os.MkdirAll(dir, 0o755) + return nil + } + // If it is a dir. + files, err := os.ReadDir(w.path) if err != nil { - return fmt.Errorf("failed to create backup directory %s: %w", dir, err) + return fmt.Errorf("failed to read directory %s: %w", w.path, err) + } + + for _, file := range files { + filePath := filepath.Join(w.path, file.Name()) + // Skip folders. + if file.IsDir() && !w.withNestedDir { + continue + } + // If validator is set, remove only valid files. + if w.validator != nil { + if err = w.validator.Run(filePath); err != nil { + continue + } + } + + if err = os.Remove(filePath); err != nil { + return fmt.Errorf("failed to remove file %s: %w", filePath, err) + } } return nil diff --git a/io/local/writer_test.go b/io/local/writer_test.go index 78092841..a28beb1e 100644 --- a/io/local/writer_test.go +++ b/io/local/writer_test.go @@ -29,8 +29,9 @@ type writerTestSuite struct { func (suite *writerTestSuite) Test_openBackupFile() { tmpDir := suite.T().TempDir() + ctx := context.Background() - factory, err := NewWriter(WithRemoveFiles(), WithDir(tmpDir)) + factory, err := NewWriter(ctx, WithRemoveFiles(), WithDir(tmpDir)) suite.NoError(err) w, err := factory.NewWriter(context.Background(), "test") @@ -50,50 +51,22 @@ func (suite *writerTestSuite) Test_openBackupFile() { func (suite *writerTestSuite) TestPrepareBackupDirectory_Positive() { dir := suite.T().TempDir() - err := prepareBackupDirectory(dir) + err := prepareBackupDirectory(dir, true) suite.NoError(err) } func (suite *writerTestSuite) TestPrepareBackupDirectory_Positive_CreateDir() { dir := suite.T().TempDir() dir += "/test" - err := prepareBackupDirectory(dir) + err := prepareBackupDirectory(dir, true) suite.NoError(err) suite.DirExists(dir) } -func (suite *writerTestSuite) TestPrepareBackupDirectory_Negative_IsNotDir() { - dir := suite.T().TempDir() - - file := dir + "/test" - f, err := os.Create(file) - if err != nil { - suite.FailNow("Failed to create file: %v", err) - } - _ = f.Close() - - err = prepareBackupDirectory(file) - suite.Error(err) -} - -func (suite *writerTestSuite) TestPrepareBackupDirectory_Negative_DirNotEmpty() { - dir := suite.T().TempDir() - - file := dir + "/test" - f, err := os.Create(file) - if err != nil { - suite.FailNow("Failed to create file: %v", err) - } - _ = f.Close() - - err = prepareBackupDirectory(dir) - suite.Error(err) -} - func (suite *writerTestSuite) TestDirectoryWriter_GetType() { tmpDir := suite.T().TempDir() - - w, err := NewWriter(WithRemoveFiles(), WithDir(tmpDir)) + ctx := context.Background() + w, err := NewWriter(ctx, WithRemoveFiles(), WithDir(tmpDir)) suite.NoError(err) suite.Equal(localType, w.GetType()) diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 822d53a1..d45c2f58 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -374,6 +374,7 @@ func runBackupRestoreDirectory(suite *backupRestoreTestSuite, backupDir := suite.T().TempDir() writers, err := local.NewWriter( + ctx, local.WithValidator(asb.NewValidator()), local.WithDir(backupDir), ) @@ -434,10 +435,11 @@ func runBackupRestoreDirectory(suite *backupRestoreTestSuite, suite.testClient.ValidateRecords(suite.T(), expectedRecs, suite.namespace, suite.set) _, err = local.NewWriter( + ctx, local.WithValidator(asb.NewValidator()), local.WithDir(backupDir), ) - suite.ErrorContains(err, "is not empty") + suite.ErrorContains(err, "must be empty") } func (suite *backupRestoreTestSuite) TestRestoreExpiredRecords() { @@ -544,6 +546,7 @@ func (suite *backupRestoreTestSuite) TestBackupRestoreIOWithPartitions() { backupDir := suite.T().TempDir() writers, err := local.NewWriter( + ctx, local.WithValidator(asb.NewValidator()), local.WithDir(backupDir), local.WithRemoveFiles(), @@ -1068,6 +1071,10 @@ func (b *byteReadWriterFactory) GetType() string { return "byte buffer" } +func (b *byteReadWriterFactory) RemoveFiles(_ context.Context) error { + return nil +} + type nopWriteCloser struct { *bytes.Buffer } diff --git a/tests/integration/s3_integration_test.go b/tests/integration/s3_integration_test.go index 48949263..271e1ded 100644 --- a/tests/integration/s3_integration_test.go +++ b/tests/integration/s3_integration_test.go @@ -157,7 +157,7 @@ func (s *writeReadTestSuite) write(filename string, bytes, times int, client *s3 "backup", s3Storasge.WithDir(backupDir), ) - s.Require().ErrorContains(err, "backup folder must be empty or set removeFiles = true") + s.Require().ErrorContains(err, "backup folder must be empty or set RemoveFiles = true") return allBytesWritten } @@ -230,7 +230,7 @@ func (s *writeReadTestSuite) writeSingleFile(filename string, bytes, times int, "backup", s3Storasge.WithFile(backupFile), ) - s.Require().ErrorContains(err, "backup folder must be empty or set removeFiles = true") + s.Require().ErrorContains(err, "backup folder must be empty or set RemoveFiles = true") return allBytesWritten }