Skip to content

Commit

Permalink
move token type to data models
Browse files Browse the repository at this point in the history
  • Loading branch information
dwelch-spike committed Feb 27, 2024
1 parent c1c91db commit e1067e3
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 142 deletions.
15 changes: 8 additions & 7 deletions backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"io"

"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"
"github.com/aerospike/aerospike-tools-backup-lib/models"
"github.com/aerospike/aerospike-tools-backup-lib/pipeline"

a "github.com/aerospike/aerospike-client-go/v7"
Expand Down Expand Up @@ -53,8 +54,8 @@ func newBackupHandler(config *BackupBaseConfig, ac *a.Client, namespace string)
return handler
}

func (bh *backupHandler) run(writers []*WriteWorker[*token]) error {
readWorkers := make([]pipeline.Worker[*token], bh.config.Parallel)
func (bh *backupHandler) run(writers []*WriteWorker[*models.Token]) error {
readWorkers := make([]pipeline.Worker[*models.Token], bh.config.Parallel)
for i := 0; i < bh.config.Parallel; i++ {
begin := (i * PARTITIONS) / bh.config.Parallel
count := PARTITIONS / bh.config.Parallel // TODO verify no off by 1 error
Expand All @@ -74,18 +75,18 @@ func (bh *backupHandler) run(writers []*WriteWorker[*token]) error {
readWorkers[i] = NewReadWorker(recordReader)
}

processorWorkers := make([]pipeline.Worker[*token], bh.config.Parallel)
processorWorkers := make([]pipeline.Worker[*models.Token], bh.config.Parallel)
for i := 0; i < bh.config.Parallel; i++ {
processor := NewNOOPProcessor()
processorWorkers[i] = NewProcessorWorker(processor)
}

writeWorkers := make([]pipeline.Worker[*token], len(writers))
writeWorkers := make([]pipeline.Worker[*models.Token], len(writers))
for i, w := range writers {
writeWorkers[i] = w
}

job := pipeline.NewPipeline[*token](
job := pipeline.NewPipeline[*models.Token](
readWorkers,
processorWorkers,
writeWorkers,
Expand Down Expand Up @@ -136,7 +137,7 @@ func (bwh *BackupToWriterHandler) run(writers []io.Writer) {

batchSize := bwh.config.Parallel
// TODO change the any typed pipeline to a message or token type
dataWriters := []*WriteWorker[*token]{}
dataWriters := []*WriteWorker[*models.Token]{}

for i, writer := range writers {

Expand Down Expand Up @@ -176,7 +177,7 @@ func (bwh *BackupToWriterHandler) Wait() error {
return <-bwh.errors
}

func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (*WriteWorker[*token], error) {
func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool) (*WriteWorker[*models.Token], error) {
enc, err := eb.CreateEncoder()
if err != nil {
return nil, err
Expand Down
12 changes: 12 additions & 0 deletions decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@ import (
"io"

"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"
"github.com/aerospike/aerospike-tools-backup-lib/models"
)

// Decoder is an interface for reading backup data as tokens.
// It is used to support different data formats.
// While the return type is `any`, the actual types returned should
// only be the types exposed by the models package.
// e.g. *models.Record, *models.UDF and *models.SecondaryIndex
//
//go:generate mockery --name Decoder
type Decoder interface {
NextToken() (*models.Token, error)
}

// ASBDecoderBuilder satisfies the DecoderBuilder interface
// It creates a new ASB format decoder
type ASBDecoderBuilder struct {
Expand Down
17 changes: 12 additions & 5 deletions encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
package backuplib

import (
"io"

"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"
"github.com/aerospike/aerospike-tools-backup-lib/models"
)

// Encoder is an interface for encoding the types from the models package.
// It is used to support different data formats.
//
//go:generate mockery --name Encoder
type Encoder interface {
EncodeRecord(v *models.Record) ([]byte, error)
EncodeUDF(v *models.UDF) ([]byte, error)
EncodeSIndex(v *models.SIndex) ([]byte, error)
}

// ASBEncoderBuilder satisfies the EncoderBuilder interface
// It creates a new ASB format encoder
type ASBEncoderBuilder struct {
dst io.Writer
}
type ASBEncoderBuilder struct{}

// NewASBEncoderBuilder returns a new ASBEncoderBuilder
func NewASBEncoderBuilder() *ASBEncoderBuilder {
Expand Down
14 changes: 12 additions & 2 deletions encoding/asb/asb_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type metaData struct {
First bool
}

// Decoder is used to decode an asb file
type Decoder struct {
countingByteScanner
header *header
Expand Down Expand Up @@ -130,7 +131,7 @@ func NewDecoder(src io.Reader) (*Decoder, error) {

}

func (r *Decoder) NextToken() (any, error) {
func (r *Decoder) NextToken() (*models.Token, error) {
v, err := func() (any, error) {
b, err := _peek(r)
if err != nil {
Expand All @@ -157,7 +158,16 @@ func (r *Decoder) NextToken() (any, error) {
return nil, newDecoderError(r.count, err)
}

return v, nil
switch v := v.(type) {
case *models.SIndex:
return models.NewSIndexToken(v), nil
case *models.UDF:
return models.NewUDFToken(v), nil
case *models.Record:
return models.NewRecordToken(v), nil
default:
return nil, fmt.Errorf("unsupported token type %T", v)
}
}

type header struct {
Expand Down
10 changes: 5 additions & 5 deletions encoding/asb/asb_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3559,7 +3559,7 @@ func TestASBReader_NextToken(t *testing.T) {
tests := []struct {
name string
fields fields
want any
want *models.Token
wantErr bool
}{
{
Expand All @@ -3569,7 +3569,7 @@ func TestASBReader_NextToken(t *testing.T) {
ByteScanner: strings.NewReader("* i userdata1 testSet1 sindex1 N 1 bin1 N\n"),
},
},
want: &models.SIndex{
want: models.NewSIndexToken(&models.SIndex{
Namespace: "userdata1",
Set: "testSet1",
Name: "sindex1",
Expand All @@ -3578,7 +3578,7 @@ func TestASBReader_NextToken(t *testing.T) {
BinName: "bin1",
BinType: models.NumericSIDataType,
},
},
}),
wantErr: false,
},
{
Expand All @@ -3598,15 +3598,15 @@ func TestASBReader_NextToken(t *testing.T) {
),
},
},
want: &models.Record{
want: models.NewRecordToken(&models.Record{
Key: intKey,
Bins: map[string]any{
"bin1": nil,
"bin2": int64(2),
},
Generation: 10,
Expiration: 10,
},
}),
},
{
name: "negative EOF",
Expand Down
13 changes: 13 additions & 0 deletions encoding/asb/asb_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ func NewEncoder() (*Encoder, error) {
return &Encoder{}, nil
}

func (o *Encoder) EncodeToken(token *models.Token) ([]byte, error) {
switch token.Type {
case models.TokenTypeRecord:
return o.EncodeRecord(token.Record)
case models.TokenTypeUDF:
return o.EncodeUDF(token.UDF)
case models.TokenTypeSIndex:
return o.EncodeSIndex(token.SIndex)
default:
return nil, fmt.Errorf("invalid token type: %v", token.Type)
}
}

func (o *Encoder) EncodeRecord(rec *models.Record) ([]byte, error) {
return recordToASB(rec)
}
Expand Down
45 changes: 45 additions & 0 deletions encoding/asb/asb_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,51 @@ type asbEncoderTestSuite struct {
suite.Suite
}

func (suite *asbEncoderTestSuite) TestEncodeToken() {
encoder, err := NewEncoder()
if err != nil {
suite.FailNow("unexpected error: %v", err)
}

key, err := a.NewKey("test", "demo", "1234")
if err != nil {
suite.FailNow("unexpected error: %v", err)
}

token := &models.Token{
Type: models.TokenTypeRecord,
Record: &models.Record{
Key: key,
Bins: a.BinMap{
"bin1": 0,
},
},
}

expected, err := encoder.EncodeRecord(token.Record)
suite.Assert().NoError(err)

actual, err := encoder.EncodeToken(token)
suite.Assert().NoError(err)
suite.Assert().Equal(expected, actual)

token.Type = models.TokenTypeUDF
actual, err = encoder.EncodeToken(token)
suite.Assert().Error(err)
suite.Assert().Nil(actual)

token.Type = models.TokenTypeSIndex
actual, err = encoder.EncodeToken(token)
suite.Assert().Error(err)
suite.Assert().Nil(actual)

token.Type = models.TokenTypeInvalid
actual, err = encoder.EncodeToken(token)
suite.Assert().Error(err)
suite.Assert().Nil(actual)

}

func (suite *asbEncoderTestSuite) TestEncodeRecord() {
encoder, err := NewEncoder()
if err != nil {
Expand Down
19 changes: 11 additions & 8 deletions mocks/Decoder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions models/data_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,47 @@ type UDF struct {
Name string
Content []byte
}

// **** Token ****

type TokenType uint8

const (
TokenTypeInvalid TokenType = iota
TokenTypeRecord
TokenTypeSIndex
TokenTypeUDF
)

// Token encompasses the other data models
// fields should be accessed based on the tokenType
type Token struct {
Record *Record
SIndex *SIndex
UDF *UDF
Type TokenType
}

// NewRecordToken creates a new token with the given record
func NewRecordToken(r *Record) *Token {
return &Token{
Record: r,
Type: TokenTypeRecord,
}
}

// NewSIndexToken creates a new token with the given secondary index
func NewSIndexToken(s *SIndex) *Token {
return &Token{
SIndex: s,
Type: TokenTypeSIndex,
}
}

// NewUDFToken creates a new token with the given UDF
func NewUDFToken(u *UDF) *Token {
return &Token{
UDF: u,
Type: TokenTypeUDF,
}
}
8 changes: 6 additions & 2 deletions processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package backuplib

import "context"
import (
"context"

"github.com/aerospike/aerospike-tools-backup-lib/models"
)

// processors.go contains the implementations of the DataProcessor interface
// used by dataPipelines in the backuplib package
Expand Down Expand Up @@ -90,7 +94,7 @@ func NewNOOPProcessor() *NOOPProcessor {
}

// Process processes the data
func (p *NOOPProcessor) Process(data *token) (*token, error) {
func (p *NOOPProcessor) Process(data *models.Token) (*models.Token, error) {
return data, nil
}

Expand Down
4 changes: 2 additions & 2 deletions processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func (suite *proccessorTestSuite) TestNOOPProcessor() {
noop := NewNOOPProcessor()
suite.NotNil(noop)

data := &token{
Type: tokenTypeRecord,
data := &models.Token{
Type: models.TokenTypeRecord,
Record: &models.Record{},
}
processed, err := noop.Process(data)
Expand Down
Loading

0 comments on commit e1067e3

Please sign in to comment.