Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle l1 rpc error #854

Open
wants to merge 8 commits into
base: l1sload
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"errors"
"fmt"
"math"
"math/big"
Expand Down Expand Up @@ -413,6 +414,12 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
stateTransitionEvmCallExecutionTimer.Update(time.Since(evmCallStart))
}

// This error can only be caught if CallerType in vm config is worker, worker will reinsert tx into txpool in case of this error
var errL1 *vm.ErrL1RPCError
if errors.As(vmerr, &errL1) {
return nil, vmerr
}
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved

// no refunds for l1 messages and system txs
if st.msg.IsL1MessageTx() || st.msg.IsSystemTx() {
return &ExecutionResult{
Expand Down
38 changes: 31 additions & 7 deletions core/vm/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"errors"
"math/big"
"time"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/common/math"
Expand Down Expand Up @@ -145,7 +146,7 @@ func PrecompiledContractsDescartes(cfg Config) map[common.Address]PrecompiledCon
common.BytesToAddress([]byte{8}): &bn256PairingIstanbul{},
common.BytesToAddress([]byte{9}): &blake2FDisabled{},
// TODO final contract address to be decided
common.BytesToAddress([]byte{1, 1}): &l1sload{l1Client: cfg.L1Client},
common.BytesToAddress([]byte{1, 1}): &l1sload{l1Client: cfg.L1Client, callerType: cfg.CallerType},
}
}

Expand Down Expand Up @@ -1164,7 +1165,8 @@ func (c *bls12381MapG2) Run(state StateDB, input []byte) ([]byte, error) {

// L1SLoad precompiled
type l1sload struct {
l1Client L1Client
l1Client L1Client
callerType CallerType
}

// RequiredGas returns the gas required to execute the pre-compiled contract.
Expand All @@ -1179,6 +1181,9 @@ func (c *l1sload) RequiredGas(input []byte) uint64 {
}

func (c *l1sload) Run(state StateDB, input []byte) ([]byte, error) {
log.Info("l1sload", "input", input)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Info("l1sload", "input", input)
log.Debug("l1sload", "input", input)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to set log.Info here? btw, can print block number here for easier searching of the specific tx containing l1sload in other tools like scrollscan.

const l1ClientMaxRetries = 3

if c.l1Client == nil {
log.Error("No L1Client in the l1sload")
return nil, ErrNoL1Client
Expand All @@ -1198,10 +1203,29 @@ func (c *l1sload) Run(state StateDB, input []byte) ([]byte, error) {
keys[i] = common.BytesToHash(input[20+32*i : 52+32*i])
}

res, err := c.l1Client.StoragesAt(context.Background(), address, keys, block)
if err != nil {
return nil, &ErrL1RPCError{err: err}
// if caller type is non-worker then we can retry request multiple times and return err, the tx will be reinserted in tx poll
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if caller type is non-worker then we can retry request multiple times and return err, the tx will be reinserted in tx poll
// if caller type is worker then we can retry request multiple times and return err, the tx will be reinserted in tx poll

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and use if c.callerType == CallerTypeWorker first to align with this comment.

// otherwise, we should retry requests forever
if c.callerType == CallerTypeNonWorker {
for {
res, err := c.l1Client.StoragesAt(context.Background(), address, keys, block)
if err == nil {
return res, nil
}
// wait before retrying
time.Sleep(100 * time.Millisecond)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do exponential backoff?

log.Warn("L1 client request error", "err", err)
Copy link
Member

@colinlyguo colinlyguo Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can add address, keys, block in the log for easier debugging. e.g. if one needs to replay the rpc failure by sending through json rpc.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Warn("L1 client request error", "err", err)
log.Warn("failed to perform L1Sload RPC call", "err", err)

}
} else {
var innerErr error
for i := 0; i < l1ClientMaxRetries; i++ {
res, err := c.l1Client.StoragesAt(context.Background(), address, keys, block)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this call have a built-in timeout mechanism? Or is it possible that it would hang indefinitely?

if err != nil {
innerErr = err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about printing the number of i-th tries, address, keys, block and error messages here? because during the middle of the for-loop, the error will be simply omitted, it's not worse recording them somehow at first.

continue
} else {
return res, nil
}
}
return nil, &ErrL1RPCError{err: innerErr}
}

return res, nil
}
12 changes: 11 additions & 1 deletion core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,19 @@ type Config struct {

ExtraEips []int // Additional EIPS that are to be enabled

L1Client L1Client // L1 RPC client
L1Client L1Client // L1 RPC client
CallerType CallerType // caller type is used in L1Sload precompile to determine whether to retry RPC call forever in case of error
}

type CallerType int

const (
// NonWorker
CallerTypeNonWorker CallerType = iota
// Worker
CallerTypeWorker
)

// ScopeContext contains the things that are per-call, such as stack and memory,
// but not transients like pc and gas
type ScopeContext struct {
Expand Down
16 changes: 14 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/state"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/core/vm"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
Expand Down Expand Up @@ -846,7 +847,7 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
traceEnv, err := tracing.CreateTraceEnv(w.chainConfig, w.chain, w.engine, w.eth.ChainDb(), state, w.chain.GetVMConfig().L1Client, parent,
traceEnv, err := tracing.CreateTraceEnv(w.chainConfig, w.chain, w.engine, w.eth.ChainDb(), state, w.chain.GetVMConfig().L1Client, vm.CallerTypeWorker, parent,
// new block with a placeholder tx, for traceEnv's ExecutionResults length & TxStorageTraces length
types.NewBlockWithHeader(header).WithBody([]*types.Transaction{types.NewTx(&types.LegacyTx{})}, nil),
commitStateAfterApply)
Expand Down Expand Up @@ -1006,9 +1007,13 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
// create new snapshot for `core.ApplyTransaction`
snap := w.current.state.Snapshot()

// todo: apply this changes to new worker when merged with upstream
// make a copy of vm config and change caller type to worker
var vmConf vm.Config = *w.chain.GetVMConfig()
vmConf.CallerType = vm.CallerTypeWorker
var receipt *types.Receipt
common.WithTimer(l2CommitTxApplyTimer, func() {
receipt, err = core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
receipt, err = core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, vmConf)
})
if err != nil {
w.current.state.RevertToSnapshot(snap)
Expand Down Expand Up @@ -1134,6 +1139,7 @@ loop:
w.current.state.SetTxContext(tx.Hash(), w.current.tcount)

logs, traces, err := w.commitTransaction(tx, coinbase)
var errL1 *vm.ErrL1RPCError
switch {
case errors.Is(err, core.ErrGasLimitReached) && tx.IsL1MessageTx():
// If this block already contains some L1 messages,
Expand Down Expand Up @@ -1168,6 +1174,12 @@ loop:
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()

case errors.As(err, &errL1):
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved
// Skip the current transaction failed on L1Sload precompile with L1RpcError without shifting in the next from the account, this tx will be left in txpool and retried in future block
log.Trace("Skipping transaction failed on L1Sload precompile with L1RpcError", "sender", from)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log error

atomic.AddInt32(&w.newTxs, int32(1))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

txs.Pop()

case errors.Is(err, nil):
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
Expand Down
80 changes: 80 additions & 0 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package miner

import (
"context"
"errors"
"math"
"math/big"
"math/rand"
Expand Down Expand Up @@ -1198,6 +1200,84 @@ func TestPrioritizeOverflowTx(t *testing.T) {
}
}

type mockL1Client struct {
failList []bool
}

func (c *mockL1Client) StoragesAt(ctx context.Context, account common.Address, keys []common.Hash, blockNumber *big.Int) ([]byte, error) {
if len(c.failList) == 0 {
return common.Hash{}.Bytes(), nil
}
failed := c.failList[0]
c.failList = c.failList[1:]
if failed {
return nil, errors.New("error")
} else {
return common.Hash{}.Bytes(), nil
}
}

func TestL1SloadFailedTxReexecuted(t *testing.T) {
assert := assert.New(t)

var (
chainConfig = params.AllCliqueProtocolChanges
db = rawdb.NewMemoryDatabase()
engine = clique.New(chainConfig.Clique, db)
)

chainConfig.Clique = &params.CliqueConfig{Period: 1, Epoch: 30000}
chainConfig.LondonBlock = big.NewInt(0)
chainConfig.DescartesBlock = big.NewInt(0)

w, b := newTestWorker(t, chainConfig, engine, db, 0)
// GetStoragesAt should fail at tracing request 2 times (3 retries for each), commitTransaction will fail during tracing and will be retried in next work
// after that GetStoragesAt shouls pass tracing 2 times and then fail on execution tx (3 retries)
// after that tx will be retried again and executed without fails
w.chain.GetVMConfig().L1Client = &mockL1Client{failList: []bool{true, true, true, true, true, true, false, false, true, true, true}}
defer w.close()

// This test chain imports the mined blocks.
db2 := rawdb.NewMemoryDatabase()
b.genesis.MustCommit(db2)
chain, _ := core.NewBlockChain(db2, nil, b.chain.Config(), engine, vm.Config{
Debug: true,
Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil)
defer chain.Stop()
chain.GetVMConfig().L1Client = &mockL1Client{}

// Ignore empty commit here for less noise.
w.skipSealHook = func(task *task) bool {
return len(task.receipts) == 0
}

// Wait for mined blocks.
sub := w.mux.Subscribe(core.NewMinedBlockEvent{})
defer sub.Unsubscribe()

// Define tx that calls L1Sload
l1SlaodAddress := common.BytesToAddress([]byte{1, 1})
input := make([]byte, 52)
tx, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress), l1SlaodAddress, big.NewInt(0), 25208, big.NewInt(10*params.InitialBaseFee), input), types.HomesteadSigner{}, testBankKey)

// Process l1sload tx
b.txPool.AddLocal(tx)
w.start()

select {
case ev := <-sub.Chan():
w.stop()
block := ev.Data.(core.NewMinedBlockEvent).Block
assert.Equal(1, len(block.Transactions()))
assert.Equal(tx.Hash(), block.Transactions()[0].Hash())
if _, err := chain.InsertChain([]*types.Block{block}); err != nil {
t.Fatalf("failed to insert new mined block %d: %v", block.NumberU64(), err)
}
case <-time.After(5 * time.Second): // Worker needs 1s to include new changes.
t.Fatalf("timeout")
}
}

func TestSkippedTransactionDatabaseEntries(t *testing.T) {
assert := assert.New(t)

Expand Down
13 changes: 8 additions & 5 deletions rollup/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewTracerWrapper() *TracerWrapper {

// CreateTraceEnvAndGetBlockTrace wraps the whole block tracing logic for a block
func (tw *TracerWrapper) CreateTraceEnvAndGetBlockTrace(chainConfig *params.ChainConfig, chainContext core.ChainContext, engine consensus.Engine, chaindb ethdb.Database, statedb *state.StateDB, l1Client vm.L1Client, parent *types.Block, block *types.Block, commitAfterApply bool) (*types.BlockTrace, error) {
traceEnv, err := CreateTraceEnv(chainConfig, chainContext, engine, chaindb, statedb, l1Client, parent, block, commitAfterApply)
traceEnv, err := CreateTraceEnv(chainConfig, chainContext, engine, chaindb, statedb, l1Client, vm.CallerTypeNonWorker, parent, block, commitAfterApply)
if err != nil {
return nil, err
}
Expand All @@ -60,6 +60,7 @@ type TraceEnv struct {
commitAfterApply bool
chainConfig *params.ChainConfig
l1Client vm.L1Client
callerType vm.CallerType

coinbase common.Address

Expand Down Expand Up @@ -99,13 +100,14 @@ type txTraceTask struct {
index int
}

func CreateTraceEnvHelper(chainConfig *params.ChainConfig, logConfig *vm.LogConfig, l1Client vm.L1Client, blockCtx vm.BlockContext, startL1QueueIndex uint64, coinbase common.Address, statedb *state.StateDB, rootBefore common.Hash, block *types.Block, commitAfterApply bool) *TraceEnv {
func CreateTraceEnvHelper(chainConfig *params.ChainConfig, logConfig *vm.LogConfig, l1Client vm.L1Client, callerType vm.CallerType, blockCtx vm.BlockContext, startL1QueueIndex uint64, coinbase common.Address, statedb *state.StateDB, rootBefore common.Hash, block *types.Block, commitAfterApply bool) *TraceEnv {
return &TraceEnv{
logConfig: logConfig,
commitAfterApply: commitAfterApply,
chainConfig: chainConfig,
coinbase: coinbase,
l1Client: l1Client,
callerType: callerType,
signer: types.MakeSigner(chainConfig, block.Number()),
state: statedb,
blockCtx: blockCtx,
Expand All @@ -122,7 +124,7 @@ func CreateTraceEnvHelper(chainConfig *params.ChainConfig, logConfig *vm.LogConf
}
}

func CreateTraceEnv(chainConfig *params.ChainConfig, chainContext core.ChainContext, engine consensus.Engine, chaindb ethdb.Database, statedb *state.StateDB, l1Client vm.L1Client, parent *types.Block, block *types.Block, commitAfterApply bool) (*TraceEnv, error) {
func CreateTraceEnv(chainConfig *params.ChainConfig, chainContext core.ChainContext, engine consensus.Engine, chaindb ethdb.Database, statedb *state.StateDB, l1Client vm.L1Client, callerType vm.CallerType, parent *types.Block, block *types.Block, commitAfterApply bool) (*TraceEnv, error) {
var coinbase common.Address

var err error
Expand Down Expand Up @@ -160,6 +162,7 @@ func CreateTraceEnv(chainConfig *params.ChainConfig, chainContext core.ChainCont
EnableReturnData: true,
},
l1Client,
callerType,
core.NewEVMBlockContext(block.Header(), chainContext, chainConfig, nil),
*startL1QueueIndex,
coinbase,
Expand Down Expand Up @@ -231,7 +234,7 @@ func (env *TraceEnv) GetBlockTrace(block *types.Block) (*types.BlockTrace, error
// Generate the next state snapshot fast without tracing
msg, _ := tx.AsMessage(env.signer, block.BaseFee())
env.state.SetTxContext(tx.Hash(), i)
vmenv := vm.NewEVM(env.blockCtx, core.NewEVMTxContext(msg), env.state, env.chainConfig, vm.Config{L1Client: env.l1Client})
vmenv := vm.NewEVM(env.blockCtx, core.NewEVMTxContext(msg), env.state, env.chainConfig, vm.Config{L1Client: env.l1Client, CallerType: env.callerType})
l1DataFee, err := fees.CalculateL1DataFee(tx, env.state)
if err != nil {
failed = err
Expand Down Expand Up @@ -332,7 +335,7 @@ func (env *TraceEnv) getTxResult(state *state.StateDB, index int, block *types.B
structLogger := vm.NewStructLogger(env.logConfig)
tracer := NewMuxTracer(structLogger, callTracer, prestateTracer)
// Run the transaction with tracing enabled.
vmenv := vm.NewEVM(env.blockCtx, txContext, state, env.chainConfig, vm.Config{L1Client: env.l1Client, Debug: true, Tracer: tracer, NoBaseFee: true})
vmenv := vm.NewEVM(env.blockCtx, txContext, state, env.chainConfig, vm.Config{L1Client: env.l1Client, Debug: true, Tracer: tracer, NoBaseFee: true, CallerType: env.callerType})

// Call Prepare to clear out the statedb access list
state.SetTxContext(txctx.TxHash, txctx.TxIndex)
Expand Down
Loading