Skip to content

Commit

Permalink
remove Policies struct
Browse files Browse the repository at this point in the history
  • Loading branch information
dwelch-spike committed Feb 27, 2024
1 parent 15e17bc commit 9b2dfe5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 55 deletions.
12 changes: 6 additions & 6 deletions backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (bh *backupHandlerBase) run(ctx context.Context, writers []*writeWorker[*mo
recordReader := newAerospikeRecordReader(
bh.aerospikeClient,
ARRCFG,
bh.config.Policies.ScanPolicy,
bh.config.ScanPolicy,
)

readWorkers[i] = newReadWorker(recordReader)
Expand Down Expand Up @@ -95,12 +95,12 @@ func (bh *backupHandlerBase) run(ctx context.Context, writers []*writeWorker[*mo

// **** Backup To Writer Handler ****

// BackupStatus stores the status of a backup job
type BackupStatus struct{}
// BackupStats stores the status of a backup job
type BackupStats struct{}

// BackupHandler handles a backup job to a set of io.writers
type BackupHandler struct {
status *BackupStatus
stats *BackupStats
config *BackupConfig
writers []io.Writer
errors chan error
Expand Down Expand Up @@ -164,8 +164,8 @@ func (bwh *BackupHandler) run(ctx context.Context, writers []io.Writer) {
}

// GetStats returns the stats of the backup job
func (bwh *BackupHandler) GetStats() BackupStatus {
return *bwh.status
func (bwh *BackupHandler) GetStats() BackupStats {
return *bwh.stats
}

// Wait waits for the backup job to complete and returns an error if the job failed
Expand Down
53 changes: 31 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ import (
// // handle error
// }
// // use the backup client to start backup and restore operations
// backupHandler, err := backupClient.BackupToWriter(writers, nil)
// ctx := context.Background()
// backupHandler, err := backupClient.Backup(ctx, writers, nil)
// if err != nil {
// // handle error
// }
// // optionally, check the stats of the backup operation
// stats := backupHandler.Stats()
// // use the backupHandler to wait for the backup operation to finish
// // err = backupHandler.Wait()
// ctx := context.Background()
// // err = backupHandler.Wait(ctx)

type Client struct {
aerospikeClient *a.Client
Expand All @@ -66,39 +70,43 @@ func NewClient(ac *a.Client, cc *Config) (*Client, error) {
}, nil
}

// getUsablePolicy returns the policies to be used for the backup and restore operations
// If the input policies are nil, the client's default policies will be used
// If the client's default policies are nil, the aerospike client's default policies will be used
func (c *Client) getUsablePolicy(p *Policies) *Policies {
policies := p
if policies == nil {
policies = c.config.Policies
func (c *Client) getUsableInfoPolicy(p *a.InfoPolicy) *a.InfoPolicy {
if p == nil {
p = c.config.InfoPolicy
}
if policies == nil {
policies = &Policies{}
if p == nil {
p = c.aerospikeClient.DefaultInfoPolicy
}
return p
}

if policies.InfoPolicy == nil {
policies.InfoPolicy = c.aerospikeClient.DefaultInfoPolicy
func (c *Client) getUsableWritePolicy(p *a.WritePolicy) *a.WritePolicy {
if p == nil {
p = c.config.WritePolicy
}

if policies.WritePolicy == nil {
policies.WritePolicy = c.aerospikeClient.DefaultWritePolicy
if p == nil {
p = c.aerospikeClient.DefaultWritePolicy
}
return p
}

if policies.ScanPolicy == nil {
policies.ScanPolicy = c.aerospikeClient.DefaultScanPolicy
func (c *Client) getUsableScanPolicy(p *a.ScanPolicy) *a.ScanPolicy {
if p == nil {
p = c.config.ScanPolicy
}

return policies
if p == nil {
p = c.aerospikeClient.DefaultScanPolicy
}
return p
}

// Backup starts a backup operation to a set of io.writers
func (c *Client) Backup(ctx context.Context, writers []io.Writer, config *BackupConfig) (*BackupHandler, error) {
if config == nil {
config = NewBackupConfig()
}
config.Policies = c.getUsablePolicy(config.Policies)
config.InfoPolicy = c.getUsableInfoPolicy(config.InfoPolicy)
config.ScanPolicy = c.getUsableScanPolicy(config.ScanPolicy)

if err := config.validate(); err != nil {
return nil, err
Expand All @@ -115,7 +123,8 @@ func (c *Client) Restore(ctx context.Context, readers []io.Reader, config *Resto
if config == nil {
config = NewRestoreConfig()
}
config.Policies = c.getUsablePolicy(config.Policies)
config.InfoPolicy = c.getUsableInfoPolicy(config.InfoPolicy)
config.WritePolicy = c.getUsableWritePolicy(config.WritePolicy)

if err := config.validate(); err != nil {
return nil, err
Expand Down
40 changes: 19 additions & 21 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ var (

// **** Policies ****

// Policies contains the Aerospike policies used during backup and restore operations
type Policies struct {
// InfoPolicy applies to Aerospike Info requests made
// **** Client ****

// Config contains configuration for the backup client
// Policies defined here will be used as defaults for any
// backup and restore operations started by the client
type Config struct {
// InfoPolicy applies to Aerospike Info requests made during backup and restore
// If nil, the Aerospike client's default policy will be used
InfoPolicy *a.InfoPolicy
// WritePolicy applies to Aerospike write operations made during backup and restore
Expand All @@ -46,16 +50,6 @@ type Policies struct {
ScanPolicy *a.ScanPolicy
}

// **** Client ****

// Config contains configuration for the backup client
type Config struct {
// Policies contains the Aerospike policies used during backup and restore operations
// If nil, the Aerospike client's default policies will be used
// These policies will be used as defaults for the backup and restore operations
Policies *Policies
}

// NewConfig returns a new client Config
func NewConfig() *Config {
return &Config{}
Expand All @@ -79,10 +73,12 @@ type BackupBaseConfig struct {
// EncoderBuilder is used to specify the encoder with which to encode the backup data
// If nil, the default encoder will be used
EncoderBuilder EncoderBuilder
// Policies contains the Aerospike policies used during backup operations
// These policies override the default policies from the backup client's configuration
// If nil, the backup client's policies will be used
Policies *Policies
// InfoPolicy applies to Aerospike Info requests made during backup
// If nil, the backup client's policy will be used, if that is nil, the aerospike client's default policy will be used
InfoPolicy *a.InfoPolicy
// ScanPolicy applies to Aerospike scan operations made during backup
// If nil, the backup client's policy will be used, if that is nil, the aerospike client's default policy will be used
ScanPolicy *a.ScanPolicy
}

func (c *BackupBaseConfig) validate() error {
Expand Down Expand Up @@ -137,10 +133,12 @@ type RestoreBaseConfig struct {
// DecoderBuilder is used to specify the decoder with which to decode backup data during restores
// If nil, the default decoder will be used
DecoderBuilder DecoderBuilder
// Policies contains the Aerospike policies used during restore operations
// These policies override the default policies from the backup client's configuration
// If nil, the backup client's policies will be used
Policies *Policies
// InfoPolicy applies to Aerospike Info requests made during restore
// If nil, the Aerospike client's default policy will be used, if that is nil, the aerospike client's default policy will be used
InfoPolicy *a.InfoPolicy
// WritePolicy applies to Aerospike write operations made during restore
// If nil, the Aerospike client's default policy will be used, if that is nil, the aerospike client's default policy will be used
WritePolicy *a.WritePolicy
}

func (c *RestoreBaseConfig) validate() error {
Expand Down
12 changes: 6 additions & 6 deletions restore_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (rh *restoreHandlerBase) run(ctx context.Context, readers []*readWorker[*mo
for i := 0; i < rh.config.Parallel; i++ {
writer := newRestoreWriter(
rh.dbClient,
rh.config.Policies.WritePolicy,
rh.config.WritePolicy,
)
writeWorkers[i] = newWriteWorker(writer)
}
Expand All @@ -87,12 +87,12 @@ func (rh *restoreHandlerBase) run(ctx context.Context, readers []*readWorker[*mo

// **** Restore From Reader Handler ****

// RestoreStatus stores the status of a restore from reader job
type RestoreStatus struct{}
// RestoreStats stores the status of a restore from reader job
type RestoreStats struct{}

// RestoreHandler handles a restore job from a set of io.readers
type RestoreHandler struct {
status *RestoreStatus
stats *RestoreStats
config *RestoreConfig
readers []io.Reader
errors chan error
Expand Down Expand Up @@ -160,8 +160,8 @@ func (rrh *RestoreHandler) run(ctx context.Context, readers []io.Reader) {
}

// GetStats returns the stats of the restore job
func (rrh *RestoreHandler) GetStats() RestoreStatus {
return *rrh.status
func (rrh *RestoreHandler) GetStats() RestoreStats {
return *rrh.stats
}

// Wait waits for the restore job to complete and returns an error if the job failed
Expand Down

0 comments on commit 9b2dfe5

Please sign in to comment.