Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pipelined state processor for follower nodes #894

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 1 addition & 104 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,21 @@
package core

import (
"errors"
"fmt"
"sync"
"time"

"github.com/scroll-tech/go-ethereum/consensus"
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/state"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rollup/circuitcapacitychecker"
"github.com/scroll-tech/go-ethereum/trie"
)

var (
validateL1MessagesTimer = metrics.NewRegisteredTimer("validator/l1msg", nil)
validateRowConsumptionTimer = metrics.NewRegisteredTimer("validator/rowconsumption", nil)
validateTraceTimer = metrics.NewRegisteredTimer("validator/trace", nil)
validateLockTimer = metrics.NewRegisteredTimer("validator/lock", nil)
validateCccTimer = metrics.NewRegisteredTimer("validator/ccc", nil)
validateL1MessagesTimer = metrics.NewRegisteredTimer("validator/l1msg", nil)
)

// BlockValidator is responsible for validating block headers, uncles and
Expand All @@ -50,12 +42,6 @@ type BlockValidator struct {
config *params.ChainConfig // Chain configuration options
bc *BlockChain // Canonical block chain
engine consensus.Engine // Consensus engine used for validating

// circuit capacity checker related fields
checkCircuitCapacity bool // whether enable circuit capacity check
cMu sync.Mutex // mutex for circuit capacity checker
tracer tracerWrapper // scroll tracer wrapper
circuitCapacityChecker *circuitcapacitychecker.CircuitCapacityChecker // circuit capacity checker instance
}

// NewBlockValidator returns a new block validator which is safe for re-use
Expand All @@ -68,17 +54,6 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin
return validator
}

type tracerWrapper interface {
CreateTraceEnvAndGetBlockTrace(*params.ChainConfig, ChainContext, consensus.Engine, ethdb.Database, *state.StateDB, *types.Block, *types.Block, bool) (*types.BlockTrace, error)
}

func (v *BlockValidator) SetupTracerAndCircuitCapacityChecker(tracer tracerWrapper) {
v.checkCircuitCapacity = true
v.tracer = tracer
v.circuitCapacityChecker = circuitcapacitychecker.NewCircuitCapacityChecker(true)
log.Info("new CircuitCapacityChecker in BlockValidator", "ID", v.circuitCapacityChecker.ID)
}

// ValidateBody validates the given block's uncles and verifies the block
// header's transaction and uncle roots. The headers are assumed to be already
// validated at this point.
Expand Down Expand Up @@ -114,26 +89,6 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if err := v.ValidateL1Messages(block); err != nil {
return err
}
if v.checkCircuitCapacity {
// if a block's RowConsumption has been stored, which means it has been processed before,
// (e.g., in miner/worker.go or in insertChain),
// we simply skip its calculation and validation
if rawdb.ReadBlockRowConsumption(v.bc.db, block.Hash()) != nil {
return nil
}
rowConsumption, err := v.validateCircuitRowConsumption(block)
if err != nil {
return err
}
log.Trace(
"Validator write block row consumption",
"id", v.circuitCapacityChecker.ID,
"number", block.NumberU64(),
"hash", block.Hash().String(),
"rowConsumption", rowConsumption,
)
rawdb.WriteBlockRowConsumption(v.bc.db, block.Hash(), rowConsumption)
omerfirmak marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

Expand Down Expand Up @@ -286,61 +241,3 @@ func CalcGasLimit(parentGasLimit, desiredLimit uint64) uint64 {
}
return limit
}

func (v *BlockValidator) createTraceEnvAndGetBlockTrace(block *types.Block) (*types.BlockTrace, error) {
parent := v.bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil {
return nil, errors.New("validateCircuitRowConsumption: no parent block found")
}

statedb, err := v.bc.StateAt(parent.Root())
if err != nil {
return nil, err
}

return v.tracer.CreateTraceEnvAndGetBlockTrace(v.config, v.bc, v.engine, v.bc.db, statedb, parent, block, true)
}

func (v *BlockValidator) validateCircuitRowConsumption(block *types.Block) (*types.RowConsumption, error) {
defer func(t0 time.Time) {
validateRowConsumptionTimer.Update(time.Since(t0))
}(time.Now())

log.Trace(
"Validator apply ccc for block",
"id", v.circuitCapacityChecker.ID,
"number", block.NumberU64(),
"hash", block.Hash().String(),
"len(txs)", block.Transactions().Len(),
)

traceStartTime := time.Now()
traces, err := v.createTraceEnvAndGetBlockTrace(block)
if err != nil {
return nil, err
}
validateTraceTimer.Update(time.Since(traceStartTime))

lockStartTime := time.Now()
v.cMu.Lock()
defer v.cMu.Unlock()
validateLockTimer.Update(time.Since(lockStartTime))

cccStartTime := time.Now()
v.circuitCapacityChecker.Reset()
log.Trace("Validator reset ccc", "id", v.circuitCapacityChecker.ID)
rc, err := v.circuitCapacityChecker.ApplyBlock(traces)
validateCccTimer.Update(time.Since(cccStartTime))

log.Trace(
"Validator apply ccc for block result",
"id", v.circuitCapacityChecker.ID,
"number", block.NumberU64(),
"hash", block.Hash().String(),
"len(txs)", block.Transactions().Len(),
"rc", rc,
"err", err,
)

return rc, err
}
5 changes: 5 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2288,3 +2288,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
func (bc *BlockChain) Database() ethdb.Database {
return bc.db
}

func (bc *BlockChain) WithStateProcessor(processor Processor) *BlockChain {
bc.processor = processor
return bc
}
4 changes: 0 additions & 4 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ type Validator interface {
// ValidateState validates the given statedb and optionally the receipts and
// gas used.
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error

// SetupTracerAndCircuitCapacityChecker sets up ScrollTracerWrapper and CircuitCapacityChecker for validator,
// to get scroll-related traces and to validate the circuit row consumption
SetupTracerAndCircuitCapacityChecker(tracer tracerWrapper)
}

// Prefetcher is an interface for pre-caching transaction signatures and state.
Expand Down
5 changes: 2 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ import (
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rlp"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
stateprocessor "github.com/scroll-tech/go-ethereum/rollup/state_processor"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
"github.com/scroll-tech/go-ethereum/rollup/tracing"
"github.com/scroll-tech/go-ethereum/rpc"
)

Expand Down Expand Up @@ -199,8 +199,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
return nil, err
}
if config.CheckCircuitCapacity {
tracer := tracing.NewTracerWrapper()
eth.blockchain.Validator().SetupTracerAndCircuitCapacityChecker(tracer)
eth.blockchain.WithStateProcessor(stateprocessor.NewProcessor(eth.blockchain))
}

// Rewind the chain in case of an incompatible config upgrade.
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 5 // Minor version component of the current release
VersionPatch = 8 // Patch version component of the current release
VersionPatch = 9 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
23 changes: 15 additions & 8 deletions rollup/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ var (
)

type Pipeline struct {
chain *core.BlockChain
vmConfig vm.Config
parent *types.Block
start time.Time
wg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
chain *core.BlockChain
vmConfig vm.Config
parent *types.Block
start time.Time
wg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
replayMode bool

// accumulators
ccc *circuitcapacitychecker.CircuitCapacityChecker
Expand Down Expand Up @@ -114,6 +115,12 @@ func (p *Pipeline) WithBeforeTxHook(beforeTxHook func()) *Pipeline {
return p
}

// WithReplayMode enables the replay mode that allows L1 messages to be skipped
func (p *Pipeline) WithReplayMode() *Pipeline {
p.replayMode = true
return p
}

func (p *Pipeline) Start(deadline time.Time) error {
p.start = time.Now()
p.txnQueue = make(chan *types.Transaction)
Expand Down Expand Up @@ -266,7 +273,7 @@ func (p *Pipeline) traceAndApplyStage(txsIn <-chan *types.Transaction) (<-chan e
return
}

if tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex {
if !p.replayMode && tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex {
// Continue, we might still be able to include some L2 messages
sendCancellable(resCh, ErrUnexpectedL1MessageIndex, p.ctx.Done())
continue
Expand Down
70 changes: 70 additions & 0 deletions rollup/state_processor/pipelined_state_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package stateprocessor

import (
"errors"
"fmt"
"time"

"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/state"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/core/vm"
"github.com/scroll-tech/go-ethereum/rollup/circuitcapacitychecker"
"github.com/scroll-tech/go-ethereum/rollup/pipeline"
)

var _ core.Processor = (*Processor)(nil)

type Processor struct {
chain *core.BlockChain
ccc *circuitcapacitychecker.CircuitCapacityChecker
}

func NewProcessor(bc *core.BlockChain) *Processor {
return &Processor{
chain: bc,
ccc: circuitcapacitychecker.NewCircuitCapacityChecker(true),
}
}

func (p *Processor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original plan was to move CCC out of the block processing loop, so that it's not blocking it. By moving CCC from block_validator to Process here, it's still blocking block insertion, right? What do we gain here?

I feel like:

  1. CCC on follower nodes does not need to block block processing, it can happen completely async.
  2. We can run CCC for multiple blocks in parallel, we don't need to process blocks one-by-one. (But of course there might be memory concerns if we trace lots of blocks in parallel.)

if block.Transactions().Len() == 0 {
return types.Receipts{}, []*types.Log{}, 0, nil
}

header := block.Header()
header.GasUsed = 0

nextL1MsgIndex := uint64(0)
// assume L1 message indexes were validated by block validator
if block.Transactions().Len() > 0 {
if l1Msg := block.Transactions()[0].AsL1MessageTx(); l1Msg != nil {
nextL1MsgIndex = l1Msg.QueueIndex
}
}

pl := pipeline.NewPipeline(p.chain, cfg, statedb, header, nextL1MsgIndex, p.ccc).WithReplayMode()
pl.Start(time.Now().Add(time.Minute))
defer pl.Release()

for _, tx := range block.Transactions() {
res, err := pl.TryPushTxn(tx)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, we'd trace and ccc the whole block in one step (different ccc API).

By using a pipeline, we now process tx-by-tx in a pipelined manner. Are the CCC results the same? And do we gain a lot from this?

if err != nil && !errors.Is(err, pipeline.ErrUnexpectedL1MessageIndex) {
return nil, nil, 0, err
}

if res != nil {
return nil, nil, 0, fmt.Errorf("pipeline ended prematurely %v", res.CCCErr)
}
}

pl.Stop()
res := <-pl.ResultCh
if res.CCCErr != nil {
return nil, nil, 0, res.CCCErr
}

rawdb.WriteBlockRowConsumption(p.chain.Database(), block.Hash(), res.Rows)
return res.FinalBlock.Receipts, res.FinalBlock.CoalescedLogs, res.FinalBlock.Header.GasUsed, nil
}