Skip to content

Commit

Permalink
adjust parameter for pipelined memdb by failpoint
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 committed Mar 6, 2024
1 parent 3625fc0 commit ff0fbed
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
31 changes: 29 additions & 2 deletions internal/unionstore/pipelined_memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type PipelinedMemDB struct {
len, size int // len and size records the total flushed and onflushing memdb.
generation uint64
entryLimit, bufferLimit uint64
flushOption flushOption
}

const (
Expand All @@ -57,6 +58,30 @@ const (
ForceFlushSizeThreshold = 128 * 1024 * 1024 // 128MB
)

type flushOption struct {
MinFlushKeys int
MinFlushSize int
ForceFlushSizeThreshold int
}

func newFlushOption() flushOption {
opt := flushOption{
MinFlushKeys: MinFlushKeys,
MinFlushSize: MinFlushSize,
ForceFlushSizeThreshold: ForceFlushSizeThreshold,
}
if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushKeys"); err == nil && val != nil {
opt.MinFlushKeys = val.(int)
}
if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushSize"); err == nil && val != nil {
opt.MinFlushSize = val.(int)
}
if val, err := util.EvalFailpoint("pipelinedMemDBForceFlushSizeThreshold"); err == nil && val != nil {
opt.ForceFlushSizeThreshold = val.(int)
}
return opt
}

type pipelinedMemDBSkipRemoteBuffer struct{}

// TODO: skip remote buffer by context is too obscure, add a new method to read local buffer.
Expand All @@ -73,6 +98,7 @@ type BufferBatchGetter func(ctx context.Context, keys [][]byte) (map[string][]by
func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc) *PipelinedMemDB {
memdb := newMemDB()
memdb.setSkipMutex(true)
flushOptoin := newFlushOption()
return &PipelinedMemDB{
memDB: memdb,
errCh: make(chan error, 1),
Expand All @@ -82,6 +108,7 @@ func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc)
// keep entryLimit and bufferLimit same with the memdb's default values.
entryLimit: memdb.entrySizeLimit,
bufferLimit: memdb.bufferSizeLimit,
flushOption: flushOptoin,
}
}

Expand Down Expand Up @@ -221,10 +248,10 @@ func (p *PipelinedMemDB) needFlush() bool {
// MinFlushSize <= size < ForceFlushSizeThreshold && keys < MinFlushKeys, do not flush.
// MinFlushSize <= size < ForceFlushSizeThreshold && keys >= MinFlushKeys, flush.
// size >= ForceFlushSizeThreshold, flush.
if size < MinFlushSize || (p.memDB.Len() < MinFlushKeys && size < ForceFlushSizeThreshold) {
if size < p.flushOption.MinFlushSize || (p.memDB.Len() < p.flushOption.MinFlushKeys && size < p.flushOption.ForceFlushSizeThreshold) {
return false
}
if p.onFlushing.Load() && size < ForceFlushSizeThreshold {
if p.onFlushing.Load() && size < p.flushOption.ForceFlushSizeThreshold {
return false
}
return true
Expand Down
43 changes: 43 additions & 0 deletions internal/unionstore/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/util"
)

func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
Expand Down Expand Up @@ -297,3 +299,44 @@ func TestErrorIterator(t *testing.T) {
iteratorToErr(memdb.SnapshotIter(nil, nil))
iteratorToErr(memdb.SnapshotIterReverse(nil, nil))
}

func TestPipelinedAdjustFlushCondition(t *testing.T) {
util.EnableFailpoints()
memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err := memdb.Flush(false)
require.Nil(t, err)
require.False(t, flushed)

// can flush even only 1 key
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)

// need 2 keys to flush
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.False(t, flushed)

// need 2 keys to flush, but force threshold reached
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(2)`))
memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil })
memdb.Set([]byte("key"), []byte("value"))
flushed, err = memdb.Flush(false)
require.Nil(t, err)
require.True(t, flushed)

require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize"))
require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold"))
}
4 changes: 4 additions & 0 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,12 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]
return s.BatchGetWithTier(ctx, keys, BatchGetSnapshotTier)
}

// BatchGet tiers indicate the read tier of the batch get request.
// BatchGet read keys in regions. The keys location and region error retry mechanism are shared.
const (
// BatchGetSnapshotTier indicates the batch get reads from a snapshot.
BatchGetSnapshotTier = 1 << iota
// BatchGetBufferTier indicates the batch get reads from the pipelined flushed buffer, only read locks in the current txn.
BatchGetBufferTier
)

Expand Down

0 comments on commit ff0fbed

Please sign in to comment.