From ff0fbed80549630ae2766d247fc32688ed69febd Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 6 Mar 2024 12:23:57 +0900 Subject: [PATCH] adjust parameter for pipelined memdb by failpoint Signed-off-by: you06 --- internal/unionstore/pipelined_memdb.go | 31 ++++++++++++++- internal/unionstore/pipelined_memdb_test.go | 43 +++++++++++++++++++++ txnkv/txnsnapshot/snapshot.go | 4 ++ 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 696b480ac..0ac39a8bc 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -44,6 +44,7 @@ type PipelinedMemDB struct { len, size int // len and size records the total flushed and onflushing memdb. generation uint64 entryLimit, bufferLimit uint64 + flushOption flushOption } const ( @@ -57,6 +58,30 @@ const ( ForceFlushSizeThreshold = 128 * 1024 * 1024 // 128MB ) +type flushOption struct { + MinFlushKeys int + MinFlushSize int + ForceFlushSizeThreshold int +} + +func newFlushOption() flushOption { + opt := flushOption{ + MinFlushKeys: MinFlushKeys, + MinFlushSize: MinFlushSize, + ForceFlushSizeThreshold: ForceFlushSizeThreshold, + } + if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushKeys"); err == nil && val != nil { + opt.MinFlushKeys = val.(int) + } + if val, err := util.EvalFailpoint("pipelinedMemDBMinFlushSize"); err == nil && val != nil { + opt.MinFlushSize = val.(int) + } + if val, err := util.EvalFailpoint("pipelinedMemDBForceFlushSizeThreshold"); err == nil && val != nil { + opt.ForceFlushSizeThreshold = val.(int) + } + return opt +} + type pipelinedMemDBSkipRemoteBuffer struct{} // TODO: skip remote buffer by context is too obscure, add a new method to read local buffer. @@ -73,6 +98,7 @@ type BufferBatchGetter func(ctx context.Context, keys [][]byte) (map[string][]by func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc) *PipelinedMemDB { memdb := newMemDB() memdb.setSkipMutex(true) + flushOptoin := newFlushOption() return &PipelinedMemDB{ memDB: memdb, errCh: make(chan error, 1), @@ -82,6 +108,7 @@ func NewPipelinedMemDB(bufferBatchGetter BufferBatchGetter, flushFunc FlushFunc) // keep entryLimit and bufferLimit same with the memdb's default values. entryLimit: memdb.entrySizeLimit, bufferLimit: memdb.bufferSizeLimit, + flushOption: flushOptoin, } } @@ -221,10 +248,10 @@ func (p *PipelinedMemDB) needFlush() bool { // MinFlushSize <= size < ForceFlushSizeThreshold && keys < MinFlushKeys, do not flush. // MinFlushSize <= size < ForceFlushSizeThreshold && keys >= MinFlushKeys, flush. // size >= ForceFlushSizeThreshold, flush. - if size < MinFlushSize || (p.memDB.Len() < MinFlushKeys && size < ForceFlushSizeThreshold) { + if size < p.flushOption.MinFlushSize || (p.memDB.Len() < p.flushOption.MinFlushKeys && size < p.flushOption.ForceFlushSizeThreshold) { return false } - if p.onFlushing.Load() && size < ForceFlushSizeThreshold { + if p.onFlushing.Load() && size < p.flushOption.ForceFlushSizeThreshold { return false } return true diff --git a/internal/unionstore/pipelined_memdb_test.go b/internal/unionstore/pipelined_memdb_test.go index 3dcd2d410..3f0b956e1 100644 --- a/internal/unionstore/pipelined_memdb_test.go +++ b/internal/unionstore/pipelined_memdb_test.go @@ -20,8 +20,10 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/util" ) func emptyBufferBatchGetter(ctx context.Context, keys [][]byte) (map[string][]byte, error) { @@ -297,3 +299,44 @@ func TestErrorIterator(t *testing.T) { iteratorToErr(memdb.SnapshotIter(nil, nil)) iteratorToErr(memdb.SnapshotIterReverse(nil, nil)) } + +func TestPipelinedAdjustFlushCondition(t *testing.T) { + util.EnableFailpoints() + memdb := NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil }) + memdb.Set([]byte("key"), []byte("value")) + flushed, err := memdb.Flush(false) + require.Nil(t, err) + require.False(t, flushed) + + // can flush even only 1 key + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`)) + memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil }) + memdb.Set([]byte("key"), []byte("value")) + flushed, err = memdb.Flush(false) + require.Nil(t, err) + require.True(t, flushed) + + // need 2 keys to flush + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`)) + memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil }) + memdb.Set([]byte("key"), []byte("value")) + flushed, err = memdb.Flush(false) + require.Nil(t, err) + require.False(t, flushed) + + // need 2 keys to flush, but force threshold reached + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(2)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(2)`)) + memdb = NewPipelinedMemDB(emptyBufferBatchGetter, func(_ uint64, db *MemDB) error { return nil }) + memdb.Set([]byte("key"), []byte("value")) + flushed, err = memdb.Flush(false) + require.Nil(t, err) + require.True(t, flushed) + + require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys")) + require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize")) + require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold")) +} diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index efa5974f3..9384cb949 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -204,8 +204,12 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] return s.BatchGetWithTier(ctx, keys, BatchGetSnapshotTier) } +// BatchGet tiers indicate the read tier of the batch get request. +// BatchGet read keys in regions. The keys location and region error retry mechanism are shared. const ( + // BatchGetSnapshotTier indicates the batch get reads from a snapshot. BatchGetSnapshotTier = 1 << iota + // BatchGetBufferTier indicates the batch get reads from the pipelined flushed buffer, only read locks in the current txn. BatchGetBufferTier )