Skip to content

Commit

Permalink
feat: config to adjust unchecked flush interval (#1297)
Browse files Browse the repository at this point in the history
* feat: add flush interval config

* feat: update interval

* feat: unchecked cache configurable

* feat: add test case
  • Loading branch information
zengchen221 authored Dec 24, 2020
1 parent f87d47d commit 27a2354
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 50 deletions.
2 changes: 2 additions & 0 deletions config/config_v10.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type DBOptimize struct {
HeightInterval uint64 `json:"heightInterval"` // pov height interval to delete data
SyncWriteHeight uint64 `json:"syncWriteHeight"` // min pov height to write trie data when sync
MaxUsage int `json:"maxUsage"`
FlushInterval int `json:"flushInterval"`
}

func DefaultConfigV10(dir string) (*ConfigV10, error) {
Expand All @@ -31,5 +32,6 @@ func defaultOptimize() *DBOptimize {
HeightInterval: 1000,
SyncWriteHeight: 400000,
MaxUsage: 95,
FlushInterval: 10,
}
}
2 changes: 2 additions & 0 deletions config/config_v10_testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type DBOptimize struct {
HeightInterval uint64 `json:"heightInterval"` // pov height interval to delete data
SyncWriteHeight uint64 `json:"syncWriteHeight"` // min pov height to write trie data when sync
MaxUsage int `json:"maxUsage"`
FlushInterval int `json:"flushInterval"`
}

func DefaultConfigV10(dir string) (*ConfigV10, error) {
Expand All @@ -31,5 +32,6 @@ func defaultOptimize() *DBOptimize {
HeightInterval: 1000,
SyncWriteHeight: 0,
MaxUsage: 95,
FlushInterval: 10,
}
}
16 changes: 9 additions & 7 deletions ledger/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,15 @@ func (l *Ledger) GetUCacheStat() []*CacheStat {

func (l *Ledger) GetUCacheStatue() map[string]string {
r := make(map[string]string)
for i, c := range l.unCheckCache.caches {
r["c"+strconv.Itoa(i)] = strconv.Itoa(c.capacity())
if l.hasUncheckedCache {
for i, c := range l.unCheckCache.caches {
r["c"+strconv.Itoa(i)] = strconv.Itoa(c.capacity())
}
r["read"] = strconv.Itoa(l.unCheckCache.readIndex)
r["write"] = strconv.Itoa(l.unCheckCache.writeIndex)
r["lastflush"] = l.unCheckCache.lastFlush.Format("2006-01-02 15:04:05")
r["flushStatue"] = strconv.FormatBool(l.unCheckCache.flushStatue)
r["flushChan"] = strconv.Itoa(len(l.unCheckCache.flushChan))
}
r["read"] = strconv.Itoa(l.unCheckCache.readIndex)
r["write"] = strconv.Itoa(l.unCheckCache.writeIndex)
r["lastflush"] = l.unCheckCache.lastFlush.Format("2006-01-02 15:04:05")
r["flushStatue"] = strconv.FormatBool(l.unCheckCache.flushStatue)
r["flushChan"] = strconv.Itoa(len(l.unCheckCache.flushChan))
return r
}
61 changes: 38 additions & 23 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,24 @@ type LedgerStore interface {

type Ledger struct {
io.Closer
dir string
cfg *config.Config
store storage.Store
cache *MemoryCache
unCheckCache *MemoryCache
rcache *rCache
cacheStats []*CacheStat
uCacheStats []*CacheStat
relation *relation.Relation
EB event.EventBus
blockConfirmed chan *types.StateBlock
ctx context.Context
cancel context.CancelFunc
verifiedData map[types.Hash]int
deletedSchema []types.Schema
logger *zap.SugaredLogger
tokenCache sync.Map
dir string
cfg *config.Config
store storage.Store
cache *MemoryCache
hasUncheckedCache bool
unCheckCache *MemoryCache
rcache *rCache
cacheStats []*CacheStat
uCacheStats []*CacheStat
relation *relation.Relation
EB event.EventBus
blockConfirmed chan *types.StateBlock
ctx context.Context
cancel context.CancelFunc
verifiedData map[types.Hash]int
deletedSchema []types.Schema
logger *zap.SugaredLogger
tokenCache sync.Map
}

var (
Expand Down Expand Up @@ -129,6 +130,11 @@ func NewLedger(cfgFile string) *Ledger {
tokenCache: sync.Map{},
cfg: cfg,
}
if cfg.DBOptimize.FlushInterval == 0 {
l.hasUncheckedCache = false
} else {
l.hasUncheckedCache = true
}
store, err := db.NewBadgerStore(dir)
if err != nil {
l.logger.Fatal(err.Error())
Expand All @@ -140,7 +146,9 @@ func NewLedger(cfgFile string) *Ledger {
}
l.relation = r
l.cache = NewMemoryCache(l, defaultBlockFlushSecs, 2000, block)
l.unCheckCache = NewMemoryCache(l, defaultUncheckFlushSecs, 50, unchecked)
if l.hasUncheckedCache {
l.unCheckCache = NewMemoryCache(l, time.Duration(cfg.DBOptimize.FlushInterval)*time.Second, 50, unchecked)
}
l.rcache = NewrCache()
l.cacheStats = make([]*CacheStat, 0)
l.uCacheStats = make([]*CacheStat, 0)
Expand Down Expand Up @@ -194,7 +202,9 @@ func (l *Ledger) SetCacheCapacity() error {
return err
}
l.cache.ResetCapacity(defaultBlockCapacity)
l.unCheckCache.ResetCapacity(defaultUncheckCapacity)
if l.hasUncheckedCache {
l.unCheckCache.ResetCapacity(defaultUncheckCapacity)
}
return nil
}

Expand Down Expand Up @@ -338,7 +348,9 @@ func (l *Ledger) Close() error {
if _, ok := lcache[l.dir]; ok {
l.cancel()
l.cache.closed()
l.unCheckCache.closed()
if l.hasUncheckedCache {
l.unCheckCache.closed()
}
if err := l.relation.Close(); err != nil {
l.logger.Error(err)
}
Expand Down Expand Up @@ -799,9 +811,12 @@ func (l *Ledger) Flush() error {
}

func (l *Ledger) FlushU() error {
lock.Lock()
defer lock.Unlock()
return l.unCheckCache.rebuild()
if l.hasUncheckedCache {
lock.Lock()
defer lock.Unlock()
return l.unCheckCache.rebuild()
}
return nil
}

func (l *Ledger) BlockConfirmed(blk *types.StateBlock) {
Expand Down
57 changes: 41 additions & 16 deletions ledger/ledger_uncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,35 @@ func (l *Ledger) DeleteGapDoDSettleStateBlock(key, blkHash types.Hash) error {
}

func (l *Ledger) setUnchecked(key []byte, unchecked *types.Unchecked) error {
return l.unCheckCache.Put(key, unchecked)
if l.hasUncheckedCache {
return l.unCheckCache.Put(key, unchecked)
} else {
v, err := unchecked.Serialize()
if err != nil {
return err
}
return l.store.Put(key, v)
}
}

func (l *Ledger) deleteUnchecked(key []byte) error {
return l.unCheckCache.Delete(key)
if l.hasUncheckedCache {
return l.unCheckCache.Delete(key)
} else {
return l.store.Delete(key)
}
}

func (l *Ledger) getUnchecked(key []byte) (*types.Unchecked, error) {
if r, err := l.unCheckCache.Get(key); r != nil {
return r.(*types.Unchecked), nil
} else {
if err == ErrKeyDeleted {
return nil, storage.KeyNotFound
if l.hasUncheckedCache {
if r, err := l.unCheckCache.Get(key); r != nil {
return r.(*types.Unchecked), nil
} else {
if err == ErrKeyDeleted {
return nil, storage.KeyNotFound
}
}
}

v, err := l.store.Get(key)
if err != nil {
return nil, err
Expand All @@ -473,9 +486,13 @@ func (l *Ledger) hasUnchecked(key []byte) (bool, error) {
}

func (l *Ledger) iteratorUnchecked(prefix []byte, end []byte, fn func(k []byte, v interface{}) error) error {
keys, err := l.unCheckCache.prefixIteratorObject(prefix, fn)
if err != nil {
return fmt.Errorf("cache iterator : %s", err)
keys := make([][]byte, 0)
if l.hasUncheckedCache {
var err error
keys, err = l.unCheckCache.prefixIteratorObject(prefix, fn)
if err != nil {
return fmt.Errorf("cache iterator : %s", err)
}
}
if err := l.DBStore().Iterator(prefix, end, func(k, v []byte) error {
if !contain(keys, k) {
Expand All @@ -496,11 +513,19 @@ func (l *Ledger) iteratorUnchecked(prefix []byte, end []byte, fn func(k []byte,

func (l *Ledger) countUnchecked(prefix []byte) (uint64, error) {
var count uint64
if err := l.iteratorUnchecked(prefix, nil, func(k []byte, v interface{}) error {
count++
return nil
}); err != nil {
return 0, err
if l.hasUncheckedCache {
if err := l.iteratorUnchecked(prefix, nil, func(k []byte, v interface{}) error {
count++
return nil
}); err != nil {
return 0, err
}
} else {
var err error
count, err = l.store.Count(prefix)
if err != nil {
return 0, err
}
}
return count, nil
}
55 changes: 54 additions & 1 deletion ledger/ledger_uncheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,44 @@ package ledger

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/google/uuid"

"github.com/qlcchain/go-qlc/common/storage"
"github.com/qlcchain/go-qlc/common/types"
"github.com/qlcchain/go-qlc/config"
"github.com/qlcchain/go-qlc/crypto/random"
"github.com/qlcchain/go-qlc/mock"
)

func setupTestCaseNonUncheckedCache(t *testing.T) (func(t *testing.T), *Ledger) {
//t.Parallel()

dir := filepath.Join(config.QlcTestDataDir(), "ledger", uuid.New().String())
_ = os.RemoveAll(dir)
cm := config.NewCfgManager(dir)
cf, _ := cm.Load()
cf.DBOptimize.FlushInterval = 0
cm.Save()
l := NewLedger(cm.ConfigFile)
fmt.Println("case: ", t.Name())
return func(t *testing.T) {
//err := l.DBStore.Erase()
err := l.Close()
if err != nil {
t.Fatal(err)
}
//CloseLedger()
err = os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
}, l
}

func addUncheckedBlock(t *testing.T, l *Ledger) (hash types.Hash, block *types.StateBlock, kind types.UncheckedKind) {
block = mock.StateBlockWithoutWork()
hash = block.GetLink()
Expand All @@ -27,6 +57,29 @@ func TestLedger_AddUncheckedBlock(t *testing.T) {
addUncheckedBlock(t, l)
}

func TestLedger_AddUncheckedBlock_NonCache(t *testing.T) {
teardownTestCase, l := setupTestCaseNonUncheckedCache(t)
defer teardownTestCase(t)
addUncheckedBlock(t, l)
}

func TestLedger_HasUncheckedBlock_NonCache(t *testing.T) {
teardownTestCase, l := setupTestCaseNonUncheckedCache(t)
defer teardownTestCase(t)

parentHash, _, kind := addUncheckedBlock(t, l)
r, _ := l.HasUncheckedBlock(parentHash, kind)
if !r {
t.Fatal()
}
t.Log("has unchecked,", r)
c, err := l.CountUncheckedBlocksStore()
if err != nil || c != 1 {
t.Fatal(err, c)
}
t.Log("unchecked count,", c)
}

func TestLedger_GetUncheckedBlock(t *testing.T) {
teardownTestCase, l := setupTestCase(t)
defer teardownTestCase(t)
Expand Down Expand Up @@ -92,7 +145,7 @@ func TestLedger_CountUncheckedBlocks(t *testing.T) {
t.Log("unchecked count,", c)
c, err = l.CountUncheckedBlocksStore()
if err != nil || c != 0 {
t.Fatal(err)
t.Fatal(err, c)
}
t.Log("unchecked count,", c)
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/api/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (l *LedgerAPI) BlocksCount2() (map[string]uint64, error) {
return nil, err
}

unCount, err := l.ledger.CountUncheckedBlocks()
unCount, err := l.ledger.CountUncheckedBlocksStore()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/api/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func TestLedgerAPI_BlocksCount2(t *testing.T) {

l.On("CountStateBlocks").Return(uint64(10), nil)
l.On("CountSmartContractBlocks").Return(uint64(5), nil)
l.On("CountUncheckedBlocks").Return(uint64(1), nil)
l.On("CountUncheckedBlocksStore").Return(uint64(1), nil)

c, err := ledgerApi.BlocksCount2()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/apis/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestLedgerAPI_BlocksCount2(t *testing.T) {

l.On("CountStateBlocks").Return(uint64(10), nil)
l.On("CountSmartContractBlocks").Return(uint64(5), nil)
l.On("CountUncheckedBlocks").Return(uint64(1), nil)
l.On("CountUncheckedBlocksStore").Return(uint64(1), nil)

c, err := ledgerApi.BlocksCount2(context.Background(), nil)
if err != nil {
Expand Down

0 comments on commit 27a2354

Please sign in to comment.