Skip to content

Commit

Permalink
feat:support compression (Snappy + Zstd)
Browse files Browse the repository at this point in the history
  • Loading branch information
akiozihao committed Jan 15, 2024
1 parent 69cc8b9 commit 1e6abe9
Show file tree
Hide file tree
Showing 5 changed files with 425 additions and 195 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ require (
)

require (
github.com/DataDog/zstd v1.5.5
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.4
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU=
github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Options struct {

// BytesPerSync specifies the number of bytes to write before calling fsync.
BytesPerSync uint32

// Compressor specifies the compressor type.
Compressor CompressorType
}

const (
Expand All @@ -43,11 +46,20 @@ const (
GB = 1024 * MB
)

type CompressorType uint8

const (
None CompressorType = iota
Snappy
Zstd
)

var DefaultOptions = Options{
DirPath: os.TempDir(),
SegmentSize: GB,
SegmentFileExt: ".SEG",
BlockCache: 32 * KB * 10,
Sync: false,
BytesPerSync: 0,
Compressor: Snappy,
}
55 changes: 52 additions & 3 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"sync"

"github.com/DataDog/zstd"
"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru/v2"
)

Expand Down Expand Up @@ -58,6 +60,7 @@ type WAL struct {
type Reader struct {
segmentReaders []*segmentReader
currentReader int
Compressor CompressorType
}

// Open opens a WAL with the given options.
Expand Down Expand Up @@ -219,6 +222,7 @@ func (wal *WAL) NewReaderWithMax(segId SegmentID) *Reader {
return &Reader{
segmentReaders: segmentReaders,
currentReader: 0,
Compressor: wal.options.Compressor,
}
}

Expand Down Expand Up @@ -276,6 +280,20 @@ func (r *Reader) Next() ([]byte, *ChunkPosition, error) {
r.currentReader++
return r.Next()
}
switch r.Compressor {
case Snappy:
data, err = snappy.Decode(nil, data)
if err != nil {
return nil, nil, err
}
case Zstd:
deCompressedData, err := zstd.Decompress(nil, data)
if err != nil {
return nil, nil, err
}
data = deCompressedData
}

return data, position, err
}

Expand Down Expand Up @@ -313,8 +331,6 @@ func (wal *WAL) ClearPendingWrites() {
}

// PendingWrites add data to wal.pendingWrites and wait for batch write.
// If the data in pendingWrites exceeds the size of one segment,
// it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites.
func (wal *WAL) PendingWrites(data []byte) {
wal.pendingWritesLock.Lock()
defer wal.pendingWritesLock.Unlock()
Expand Down Expand Up @@ -342,6 +358,8 @@ func (wal *WAL) rotateActiveSegment() error {

// WriteAll write wal.pendingWrites to WAL and then clear pendingWrites,
// it will not sync the segment file based on wal.options, you should call Sync() manually.
// If the data in pendingWrites exceeds the size of one segment,
// it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites.
func (wal *WAL) WriteAll() ([]*ChunkPosition, error) {
if len(wal.pendingWrites) == 0 {
return make([]*ChunkPosition, 0), nil
Expand Down Expand Up @@ -380,9 +398,22 @@ func (wal *WAL) WriteAll() ([]*ChunkPosition, error) {
func (wal *WAL) Write(data []byte) (*ChunkPosition, error) {
wal.mu.Lock()
defer wal.mu.Unlock()

switch wal.options.Compressor {
case Snappy:
data = snappy.Encode(nil, data)
case Zstd:
CompressedData, err := zstd.Compress(nil, data)
if err != nil {
return nil, err
}
data = CompressedData
}

if int64(len(data))+chunkHeaderSize > wal.options.SegmentSize {
return nil, ErrValueTooLarge
}

// if the active segment file is full, sync it and create a new one.
if wal.isFull(int64(len(data))) {
if err := wal.rotateActiveSegment(); err != nil {
Expand Down Expand Up @@ -432,7 +463,25 @@ func (wal *WAL) Read(pos *ChunkPosition) ([]byte, error) {
}

// read the data from the segment file.
return segment.Read(pos.BlockNumber, pos.ChunkOffset)
data, err := segment.Read(pos.BlockNumber, pos.ChunkOffset)
if err != nil {
return nil, err
}
switch wal.options.Compressor {
case Snappy:
data, err = snappy.Decode(nil, data)
if err != nil {
return nil, err
}
case Zstd:
deCompressedData, err := zstd.Decompress(nil, data)
if err != nil {
return nil, err
}
data = deCompressedData
}

return data, nil
}

// Close closes the WAL.
Expand Down
Loading

0 comments on commit 1e6abe9

Please sign in to comment.