From b1491c48ec806690dd450e5c026e1e7c581a598b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Wed, 10 Jul 2024 11:50:36 +0300 Subject: [PATCH 1/5] feat: pipelined state processor for follower nodes --- core/block_validator.go | 99 ------------------- core/blockchain.go | 5 + core/types.go | 4 - eth/backend.go | 5 +- .../pipelined_state_processor.go | 68 +++++++++++++ 5 files changed, 75 insertions(+), 106 deletions(-) create mode 100644 rollup/state_processor/pipelined_state_processor.go diff --git a/core/block_validator.go b/core/block_validator.go index 6773649c6d13..4f0b9149b7c2 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -17,20 +17,16 @@ package core import ( - "errors" "fmt" - "sync" "time" "github.com/scroll-tech/go-ethereum/consensus" "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/state" "github.com/scroll-tech/go-ethereum/core/types" - "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/metrics" "github.com/scroll-tech/go-ethereum/params" - "github.com/scroll-tech/go-ethereum/rollup/circuitcapacitychecker" "github.com/scroll-tech/go-ethereum/trie" ) @@ -50,12 +46,6 @@ type BlockValidator struct { config *params.ChainConfig // Chain configuration options bc *BlockChain // Canonical block chain engine consensus.Engine // Consensus engine used for validating - - // circuit capacity checker related fields - checkCircuitCapacity bool // whether enable circuit capacity check - cMu sync.Mutex // mutex for circuit capacity checker - tracer tracerWrapper // scroll tracer wrapper - circuitCapacityChecker *circuitcapacitychecker.CircuitCapacityChecker // circuit capacity checker instance } // NewBlockValidator returns a new block validator which is safe for re-use @@ -68,17 +58,6 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin return validator } -type tracerWrapper interface { - CreateTraceEnvAndGetBlockTrace(*params.ChainConfig, ChainContext, consensus.Engine, ethdb.Database, *state.StateDB, *types.Block, *types.Block, bool) (*types.BlockTrace, error) -} - -func (v *BlockValidator) SetupTracerAndCircuitCapacityChecker(tracer tracerWrapper) { - v.checkCircuitCapacity = true - v.tracer = tracer - v.circuitCapacityChecker = circuitcapacitychecker.NewCircuitCapacityChecker(true) - log.Info("new CircuitCapacityChecker in BlockValidator", "ID", v.circuitCapacityChecker.ID) -} - // ValidateBody validates the given block's uncles and verifies the block // header's transaction and uncle roots. The headers are assumed to be already // validated at this point. @@ -114,26 +93,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if err := v.ValidateL1Messages(block); err != nil { return err } - if v.checkCircuitCapacity { - // if a block's RowConsumption has been stored, which means it has been processed before, - // (e.g., in miner/worker.go or in insertChain), - // we simply skip its calculation and validation - if rawdb.ReadBlockRowConsumption(v.bc.db, block.Hash()) != nil { - return nil - } - rowConsumption, err := v.validateCircuitRowConsumption(block) - if err != nil { - return err - } - log.Trace( - "Validator write block row consumption", - "id", v.circuitCapacityChecker.ID, - "number", block.NumberU64(), - "hash", block.Hash().String(), - "rowConsumption", rowConsumption, - ) - rawdb.WriteBlockRowConsumption(v.bc.db, block.Hash(), rowConsumption) - } return nil } @@ -286,61 +245,3 @@ func CalcGasLimit(parentGasLimit, desiredLimit uint64) uint64 { } return limit } - -func (v *BlockValidator) createTraceEnvAndGetBlockTrace(block *types.Block) (*types.BlockTrace, error) { - parent := v.bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - if parent == nil { - return nil, errors.New("validateCircuitRowConsumption: no parent block found") - } - - statedb, err := v.bc.StateAt(parent.Root()) - if err != nil { - return nil, err - } - - return v.tracer.CreateTraceEnvAndGetBlockTrace(v.config, v.bc, v.engine, v.bc.db, statedb, parent, block, true) -} - -func (v *BlockValidator) validateCircuitRowConsumption(block *types.Block) (*types.RowConsumption, error) { - defer func(t0 time.Time) { - validateRowConsumptionTimer.Update(time.Since(t0)) - }(time.Now()) - - log.Trace( - "Validator apply ccc for block", - "id", v.circuitCapacityChecker.ID, - "number", block.NumberU64(), - "hash", block.Hash().String(), - "len(txs)", block.Transactions().Len(), - ) - - traceStartTime := time.Now() - traces, err := v.createTraceEnvAndGetBlockTrace(block) - if err != nil { - return nil, err - } - validateTraceTimer.Update(time.Since(traceStartTime)) - - lockStartTime := time.Now() - v.cMu.Lock() - defer v.cMu.Unlock() - validateLockTimer.Update(time.Since(lockStartTime)) - - cccStartTime := time.Now() - v.circuitCapacityChecker.Reset() - log.Trace("Validator reset ccc", "id", v.circuitCapacityChecker.ID) - rc, err := v.circuitCapacityChecker.ApplyBlock(traces) - validateCccTimer.Update(time.Since(cccStartTime)) - - log.Trace( - "Validator apply ccc for block result", - "id", v.circuitCapacityChecker.ID, - "number", block.NumberU64(), - "hash", block.Hash().String(), - "len(txs)", block.Transactions().Len(), - "rc", rc, - "err", err, - ) - - return rc, err -} diff --git a/core/blockchain.go b/core/blockchain.go index 63b244cc06c7..f187b2dca832 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2288,3 +2288,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i func (bc *BlockChain) Database() ethdb.Database { return bc.db } + +func (bc *BlockChain) WithStateProcessor(processor Processor) *BlockChain { + bc.processor = processor + return bc +} diff --git a/core/types.go b/core/types.go index da8e31cbdce3..75c01e61c4b3 100644 --- a/core/types.go +++ b/core/types.go @@ -32,10 +32,6 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error - - // SetupTracerAndCircuitCapacityChecker sets up ScrollTracerWrapper and CircuitCapacityChecker for validator, - // to get scroll-related traces and to validate the circuit row consumption - SetupTracerAndCircuitCapacityChecker(tracer tracerWrapper) } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/backend.go b/eth/backend.go index 280e956b619e..d0af39ee7dd0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -56,8 +56,8 @@ import ( "github.com/scroll-tech/go-ethereum/params" "github.com/scroll-tech/go-ethereum/rlp" "github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service" + stateprocessor "github.com/scroll-tech/go-ethereum/rollup/state_processor" "github.com/scroll-tech/go-ethereum/rollup/sync_service" - "github.com/scroll-tech/go-ethereum/rollup/tracing" "github.com/scroll-tech/go-ethereum/rpc" ) @@ -199,8 +199,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl return nil, err } if config.CheckCircuitCapacity { - tracer := tracing.NewTracerWrapper() - eth.blockchain.Validator().SetupTracerAndCircuitCapacityChecker(tracer) + eth.blockchain.WithStateProcessor(stateprocessor.NewProcessor(eth.blockchain)) } // Rewind the chain in case of an incompatible config upgrade. diff --git a/rollup/state_processor/pipelined_state_processor.go b/rollup/state_processor/pipelined_state_processor.go new file mode 100644 index 000000000000..dd6ff062eb99 --- /dev/null +++ b/rollup/state_processor/pipelined_state_processor.go @@ -0,0 +1,68 @@ +package stateprocessor + +import ( + "errors" + "fmt" + "time" + + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/state" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/core/vm" + "github.com/scroll-tech/go-ethereum/rollup/circuitcapacitychecker" + "github.com/scroll-tech/go-ethereum/rollup/pipeline" +) + +var _ core.Processor = (*Processor)(nil) + +type Processor struct { + chain *core.BlockChain + ccc *circuitcapacitychecker.CircuitCapacityChecker +} + +func NewProcessor(bc *core.BlockChain) *Processor { + return &Processor{ + chain: bc, + ccc: circuitcapacitychecker.NewCircuitCapacityChecker(true), + } +} + +func (p *Processor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { + if block.Transactions().Len() == 0 { + return types.Receipts{}, []*types.Log{}, 0, nil + } + + header := block.Header() + header.GasUsed = 0 + + nextL1MsgIndex := uint64(0) + // assume L1 message indexes were validated by block validator + if block.Transactions().Len() > 0 { + if l1Msg := block.Transactions()[0].AsL1MessageTx(); l1Msg != nil { + nextL1MsgIndex = l1Msg.QueueIndex + } + } + + pl := pipeline.NewPipeline(p.chain, cfg, statedb, header, nextL1MsgIndex, p.ccc) + pl.Start(time.Now().Add(time.Minute)) + defer pl.Release() + + for _, tx := range block.Transactions() { + res, err := pl.TryPushTxn(tx) + if err != nil && !errors.Is(err, pipeline.ErrUnexpectedL1MessageIndex) { + return nil, nil, 0, err + } + + if res != nil { + return nil, nil, 0, fmt.Errorf("pipeline ended prematurely %v", res.CCCErr) + } + } + + pl.Stop() + res := <-pl.ResultCh + if res.CCCErr != nil { + return nil, nil, 0, res.CCCErr + } + + return res.FinalBlock.Receipts, res.FinalBlock.CoalescedLogs, res.FinalBlock.Header.GasUsed, nil +} From 4f2963a74d52e44bf34cc6f51b153ef2901eee36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Wed, 10 Jul 2024 12:31:16 +0300 Subject: [PATCH 2/5] pipeline replay mode --- rollup/pipeline/pipeline.go | 23 ++++++++++++------- .../pipelined_state_processor.go | 2 +- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/rollup/pipeline/pipeline.go b/rollup/pipeline/pipeline.go index e4467862def8..e9186ae42296 100644 --- a/rollup/pipeline/pipeline.go +++ b/rollup/pipeline/pipeline.go @@ -53,13 +53,14 @@ var ( ) type Pipeline struct { - chain *core.BlockChain - vmConfig vm.Config - parent *types.Block - start time.Time - wg sync.WaitGroup - ctx context.Context - cancelCtx context.CancelFunc + chain *core.BlockChain + vmConfig vm.Config + parent *types.Block + start time.Time + wg sync.WaitGroup + ctx context.Context + cancelCtx context.CancelFunc + replayMode bool // accumulators ccc *circuitcapacitychecker.CircuitCapacityChecker @@ -114,6 +115,12 @@ func (p *Pipeline) WithBeforeTxHook(beforeTxHook func()) *Pipeline { return p } +// WithReplayMode enables the replay mode that allows L1 messages to be skipped +func (p *Pipeline) WithReplayMode() *Pipeline { + p.replayMode = true + return p +} + func (p *Pipeline) Start(deadline time.Time) error { p.start = time.Now() p.txnQueue = make(chan *types.Transaction) @@ -266,7 +273,7 @@ func (p *Pipeline) traceAndApplyStage(txsIn <-chan *types.Transaction) (<-chan e return } - if tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex { + if !p.replayMode && tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex { // Continue, we might still be able to include some L2 messages sendCancellable(resCh, ErrUnexpectedL1MessageIndex, p.ctx.Done()) continue diff --git a/rollup/state_processor/pipelined_state_processor.go b/rollup/state_processor/pipelined_state_processor.go index dd6ff062eb99..bdb85db76d52 100644 --- a/rollup/state_processor/pipelined_state_processor.go +++ b/rollup/state_processor/pipelined_state_processor.go @@ -43,7 +43,7 @@ func (p *Processor) Process(block *types.Block, statedb *state.StateDB, cfg vm.C } } - pl := pipeline.NewPipeline(p.chain, cfg, statedb, header, nextL1MsgIndex, p.ccc) + pl := pipeline.NewPipeline(p.chain, cfg, statedb, header, nextL1MsgIndex, p.ccc).WithReplayMode() pl.Start(time.Now().Add(time.Minute)) defer pl.Release() From 4a138ce17afe5bc46d0d74cbfad26ddcb677b42b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Wed, 10 Jul 2024 12:53:15 +0300 Subject: [PATCH 3/5] save row cosumption --- rollup/state_processor/pipelined_state_processor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rollup/state_processor/pipelined_state_processor.go b/rollup/state_processor/pipelined_state_processor.go index bdb85db76d52..bfafa21c3157 100644 --- a/rollup/state_processor/pipelined_state_processor.go +++ b/rollup/state_processor/pipelined_state_processor.go @@ -6,6 +6,7 @@ import ( "time" "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/state" "github.com/scroll-tech/go-ethereum/core/types" "github.com/scroll-tech/go-ethereum/core/vm" @@ -64,5 +65,6 @@ func (p *Processor) Process(block *types.Block, statedb *state.StateDB, cfg vm.C return nil, nil, 0, res.CCCErr } + rawdb.WriteBlockRowConsumption(p.chain.Database(), block.Hash(), res.Rows) return res.FinalBlock.Receipts, res.FinalBlock.CoalescedLogs, res.FinalBlock.Header.GasUsed, nil } From e0b317b384aaa816ab61913e37fb0fb80d7b3eac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Wed, 10 Jul 2024 13:02:04 +0300 Subject: [PATCH 4/5] cleanup unused metrics --- core/block_validator.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/block_validator.go b/core/block_validator.go index 4f0b9149b7c2..71bd9c9dfabb 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -31,11 +31,7 @@ import ( ) var ( - validateL1MessagesTimer = metrics.NewRegisteredTimer("validator/l1msg", nil) - validateRowConsumptionTimer = metrics.NewRegisteredTimer("validator/rowconsumption", nil) - validateTraceTimer = metrics.NewRegisteredTimer("validator/trace", nil) - validateLockTimer = metrics.NewRegisteredTimer("validator/lock", nil) - validateCccTimer = metrics.NewRegisteredTimer("validator/ccc", nil) + validateL1MessagesTimer = metrics.NewRegisteredTimer("validator/l1msg", nil) ) // BlockValidator is responsible for validating block headers, uncles and From e5011a226d23462c0c1497fd82b9990eee8ce5aa Mon Sep 17 00:00:00 2001 From: omerfirmak Date: Wed, 10 Jul 2024 10:04:32 +0000 Subject: [PATCH 5/5] =?UTF-8?q?chore:=20auto=20version=20bump=E2=80=89[bot?= =?UTF-8?q?]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 4c0bff1aad0a..a699a2e702d6 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 5 // Minor version component of the current release - VersionPatch = 8 // Patch version component of the current release + VersionPatch = 9 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string )