Skip to content

Commit

Permalink
move asb format to singel package, share constants
Browse files Browse the repository at this point in the history
  • Loading branch information
dwelch-spike committed Feb 22, 2024
1 parent eb6933e commit f3ab9a9
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 120 deletions.
4 changes: 2 additions & 2 deletions backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"io"

datahandlers "github.com/aerospike/aerospike-tools-backup-lib/data_handlers"
"github.com/aerospike/aerospike-tools-backup-lib/encoder"
"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"

a "github.com/aerospike/aerospike-client-go/v7"
)
Expand Down Expand Up @@ -177,7 +177,7 @@ func getDataWriter(eb EncoderBuilder, w io.Writer, namespace string, first bool)
}

switch encT := enc.(type) {
case *encoder.ASBEncoder:
case *asb.ASBEncoder:
asbw := datahandlers.NewASBWriter(encT, w)
err := asbw.Init(namespace, first)
return asbw, err
Expand Down
4 changes: 2 additions & 2 deletions decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package backuplib
import (
"io"

"github.com/aerospike/aerospike-tools-backup-lib/decoder"
"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"
)

// ASBDecoderBuilder satisfies the DecoderBuilder interface
Expand All @@ -41,5 +41,5 @@ func (f *ASBDecoderBuilder) SetSource(src io.Reader) {
// CreateDecoder creates a new ASBDecoder
// This method is called by the backup client to create a new decoder
func (f *ASBDecoderBuilder) CreateDecoder() (Decoder, error) {
return decoder.NewASBDecoder(f.src)
return asb.NewASBDecoder(f.src)
}
4 changes: 2 additions & 2 deletions encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package backuplib
import (
"io"

"github.com/aerospike/aerospike-tools-backup-lib/encoder"
"github.com/aerospike/aerospike-tools-backup-lib/encoding/asb"
)

// ASBEncoderBuilder satisfies the EncoderBuilder interface
Expand All @@ -34,5 +34,5 @@ func NewASBEncoderBuilder() *ASBEncoderBuilder {
// CreateEncoder creates a new ASBEncoder
// This method is called by the backup client to create a new encoder
func (f *ASBEncoderBuilder) CreateEncoder() (Encoder, error) {
return encoder.NewASBEncoder()
return asb.NewASBEncoder()
}
64 changes: 1 addition & 63 deletions decoder/asb.go → encoding/asb/asb_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package decoder
package asb

import (
"bufio"
Expand All @@ -29,68 +29,6 @@ import (
a "github.com/aerospike/aerospike-client-go/v7"
)

// section names
const (
sectionUndefined = ""
sectionHeader = "header"
sectionMetadata = "meta-data"
sectionGlobal = "global"
sectionRecord = "records"
)

// section markers
const (
markerGlobalSection byte = '*'
markerMetadataSection byte = '#'
markerRecordHeader byte = '+'
markerRecordBins byte = '-'
)

// line names
const (
lineTypeUndefined = ""
lineTypeVersion = "version"
lineTypeNamespace = "namespace"
lineTypeUDF = "UDF"
lineTypeSindex = "sindex"
lineTypeRecordHeader = "record header"
lineTypeRecordBins = "record bins"
lineTypeBin = "bin"
lineTypeKey = "key"
lineTypeDigest = "digest"
lineTypesSet = "set"
lineTypeGen = "generation"
lineTypeExpiration = "expiration"
lineTypeBinCount = "bin count"
lineTypeFirst = "first"
)

// literal asb tokens
const (
tokenNamespace = "namespace"
tokenFirstFile = "first-file"
toeknASBVersion = "Version"
)

// value bounds
const (
maxNamespaceLength = 31
maxTokenSize = 1000
maxGeneration = math.MaxUint16
maxBinCount = math.MaxUint16
)

// asb boolean encoding
const (
boolTrueByte = 'T'
boolFalseByte = 'F'
)

// escape character
const (
asbEscape = '\\'
)

// TODO maybe use error functions for each asb level, reader, section, line type
// that way you don't have to track section and line type in the reader
// this would allow passing an interface for the reader to most functions
Expand Down
2 changes: 1 addition & 1 deletion decoder/asb_test.go → encoding/asb/asb_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package decoder
package asb

import (
"bufio"
Expand Down
81 changes: 34 additions & 47 deletions encoder/asb.go → encoding/asb/asb_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package encoder
package asb

import (
"bytes"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -26,10 +27,6 @@ import (
a "github.com/aerospike/aerospike-client-go/v7"
)

const (
ASBFormatVersion = "3.1"
)

type ASBEncoder struct{}

func NewASBEncoder() (*ASBEncoder, error) {
Expand All @@ -53,21 +50,15 @@ func (o *ASBEncoder) GetVersionText() []byte {
}

func (o *ASBEncoder) GetNamespaceMetaText(namespace string) []byte {
return []byte(fmt.Sprintf("# namespace %s\n", escapeASBS(namespace)))
return []byte(fmt.Sprintf("%c namespace %s\n", markerMetadataSection, escapeASBS(namespace)))
}

func (o *ASBEncoder) GetFirstMetaText() []byte {
return []byte("# first-file\n")
return []byte(fmt.Sprintf("%c first-file\n", markerMetadataSection))
}

// **** RECORD ****

const (
citrusLeafEpoch = 1262304000 // pulled from C client cf_clock.h
asbTrue = 'T'
asbFalse = 'F'
)

// function pointer for time.Now to allow for testing
var getTimeNow = time.Now

Expand All @@ -84,14 +75,14 @@ func recordToASB(r *models.Record) ([]byte, error) {
}
data = append(data, keyText...)

generationText := fmt.Sprintf("+ g %d\n", r.Generation)
generationText := fmt.Sprintf("%c g %d\n", markerRecordHeader, r.Generation)
data = append(data, generationText...)

exprTime := getExpirationTime(r.Expiration, getTimeNow().Unix())
expirationText := fmt.Sprintf("+ t %d\n", exprTime)
expirationText := fmt.Sprintf("%c t %d\n", markerRecordHeader, exprTime)
data = append(data, expirationText...)

binCountText := fmt.Sprintf("+ b %d\n", len(r.Bins))
binCountText := fmt.Sprintf("%c b %d\n", markerRecordHeader, len(r.Bins))
data = append(data, binCountText...)

binsText, err := binsToASB(r.Bins)
Expand Down Expand Up @@ -127,41 +118,45 @@ func binsToASB(bins a.BinMap) ([]byte, error) {
}

func binToASB(k string, v any) ([]byte, error) {
var res []byte
var res bytes.Buffer

res.Write([]byte{markerRecordBins, ' '})

binName := escapeASBS(k)

switch v := v.(type) {
case bool:
return []byte(fmt.Sprintf("- Z %s %c\n", binName, boolToASB(v))), nil
res.Write([]byte(fmt.Sprintf("Z %s %c\n", binName, boolToASB(v))))
case int64, int32, int16, int8, int:
return []byte(fmt.Sprintf("- I %s %d\n", binName, v)), nil
res.Write([]byte(fmt.Sprintf("I %s %d\n", binName, v)))
case float64:
return []byte(fmt.Sprintf("- D %s %f\n", binName, v)), nil
res.Write([]byte(fmt.Sprintf("D %s %f\n", binName, v)))
case string:
return []byte(fmt.Sprintf("- S %s %d %s\n", binName, len(v), v)), nil
res.Write([]byte(fmt.Sprintf("S %s %d %s\n", binName, len(v), v)))
case a.HLLValue:
encoded := base64Encode(v)
return []byte(fmt.Sprintf("- Y %s %d %s\n", binName, len(encoded), encoded)), nil
res.Write([]byte(fmt.Sprintf("Y %s %d %s\n", binName, len(encoded), encoded)))
case []byte:
encoded := base64Encode(v)
return []byte(fmt.Sprintf("- B %s %d %s\n", binName, len(encoded), encoded)), nil
res.Write([]byte(fmt.Sprintf("B %s %d %s\n", binName, len(encoded), encoded)))
case map[any]any:
return nil, errors.New("map bin not supported")
case []any:
return nil, errors.New("list bin not supported")
case nil:
return []byte(fmt.Sprintf("- N %s\n", binName)), nil
res.Write([]byte(fmt.Sprintf("N %s\n", binName)))
default:
return res, fmt.Errorf("unknown user key type: %T, key: %s", v, k)
return nil, fmt.Errorf("unknown user key type: %T, key: %s", v, k)
}

return res.Bytes(), nil
}

func boolToASB(b bool) byte {
if b {
return asbTrue
return boolTrueByte
}
return asbFalse
return boolFalseByte
}

func keyToASB(k *a.Key) ([]byte, error) {
Expand All @@ -177,15 +172,15 @@ func keyToASB(k *a.Key) ([]byte, error) {
}
data = append(data, userKeyText...)

namespaceText := fmt.Sprintf("+ n %s\n", escapeASBS(k.Namespace()))
namespaceText := fmt.Sprintf("%c n %s\n", markerRecordHeader, escapeASBS(k.Namespace()))
data = append(data, namespaceText...)

b64Digest := base64Encode(k.Digest())
digestText := fmt.Sprintf("+ d %s\n", b64Digest)
digestText := fmt.Sprintf("%c d %s\n", markerRecordHeader, b64Digest)
data = append(data, digestText...)

if k.SetName() != "" {
setnameText := fmt.Sprintf("+ s %s\n", escapeASBS(k.SetName()))
setnameText := fmt.Sprintf("%c s %s\n", markerRecordHeader, escapeASBS(k.SetName()))
data = append(data, setnameText...)
}

Expand All @@ -197,7 +192,7 @@ func base64Encode(b []byte) string {
}

func userKeyToASB(userKey a.Value) ([]byte, error) {
var data []byte
var data bytes.Buffer

// user key is optional
if userKey == nil {
Expand All @@ -208,19 +203,19 @@ func userKeyToASB(userKey a.Value) ([]byte, error) {

switch v := val.(type) {
case int64, int32, int16, int8, int:
data = []byte(fmt.Sprintf("+ k I %d\n", v))
data.Write([]byte(fmt.Sprintf("%c k I %d\n", markerRecordHeader, v)))
case float64:
data = []byte(fmt.Sprintf("+ k D %f\n", v))
data.Write([]byte(fmt.Sprintf("%c k D %f\n", markerRecordHeader, v)))
case string:
data = []byte(fmt.Sprintf("+ k S %d %s\n", len(v), v))
data.Write([]byte(fmt.Sprintf("%c k S %d %s\n", markerRecordHeader, len(v), v)))
case []byte:
encoded := base64Encode(v)
data = []byte(fmt.Sprintf("+ k B %d %s\n", len(encoded), encoded))
data.Write([]byte(fmt.Sprintf("%c k B %d %s\n", markerRecordHeader, len(encoded), encoded)))
default:
return nil, fmt.Errorf("invalid user key type: %T", v)
}

return data, nil
return data.Bytes(), nil
}

func getExpirationTime(ttl uint32, unix_now int64) uint32 {
Expand All @@ -235,12 +230,6 @@ func getExpirationTime(ttl uint32, unix_now int64) uint32 {

// **** SINDEX ****

// line markers
const (
globalChar byte = '*'
sindexChar byte = 'i'
)

// control characters
var asbEscapedChars = map[byte]struct{}{
'\\': {},
Expand All @@ -253,15 +242,14 @@ func escapeASBS(s string) string {
v := []byte{}
for _, c := range in {
if _, ok := asbEscapedChars[c]; ok {
v = append(v, '\\')
v = append(v, asbEscape)
}
v = append(v, c)
}

return string(v)
}

// TODO support escaped tokens
func _SIndexToASB(sindex *models.SIndex) ([]byte, error) {
if sindex == nil {
return nil, errors.New("sindex is nil")
Expand All @@ -272,8 +260,8 @@ func _SIndexToASB(sindex *models.SIndex) ([]byte, error) {

v := fmt.Sprintf(
"%c %c %s %s %s %c %d %s %c",
globalChar,
sindexChar,
markerGlobalSection,
'i',
escapeASBS(sindex.Namespace),
escapeASBS(sindex.Set),
escapeASBS(sindex.Name),
Expand All @@ -283,7 +271,6 @@ func _SIndexToASB(sindex *models.SIndex) ([]byte, error) {
byte(sindex.Path.BinType),
)

// TODO does this need to be base64 encoded?
if sindex.Path.B64Context != "" {
v = fmt.Sprintf("%s %s", v, sindex.Path.B64Context)
}
Expand Down
6 changes: 3 additions & 3 deletions encoder/asb_test.go → encoding/asb/asb_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package encoder
package asb

import (
"fmt"
Expand Down Expand Up @@ -356,14 +356,14 @@ func Test_boolToASB(t *testing.T) {
args: args{
b: true,
},
want: asbTrue,
want: boolTrueByte,
},
{
name: "positive false",
args: args{
b: false,
},
want: asbFalse,
want: boolFalseByte,
},
}
for _, tt := range tests {
Expand Down
Loading

0 comments on commit f3ab9a9

Please sign in to comment.