Skip to content

Commit

Permalink
Merge pull request #2044 from OffchainLabs/batch-poster-lock-later
Browse files Browse the repository at this point in the history
Only lock redis in the batch poster when posting the batch
  • Loading branch information
PlasmaPower committed Dec 22, 2023
2 parents ef9379b + 51e9b55 commit f546581
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 36 deletions.
58 changes: 49 additions & 9 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
Expand Down Expand Up @@ -286,7 +287,6 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
HeaderReader: opts.L1Reader,
Auth: opts.TransactOpts,
RedisClient: redisClient,
RedisLock: redisLock,
Config: dataPosterConfigFetcher,
MetadataRetriever: b.getBatchPosterPosition,
ExtraBacklog: b.GetBacklogEstimate,
Expand Down Expand Up @@ -770,6 +770,8 @@ func (b *BatchPoster) encodeAddBatch(seqNum *big.Int, prevMsgNum arbutil.Message
return fullData, nil
}

var ErrNormalGasEstimationFailed = errors.New("normal gas estimation failed")

func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, delayedMessages uint64, realData []byte, realNonce uint64, realAccessList types.AccessList) (uint64, error) {
config := b.config()
useNormalEstimation := b.dataPoster.MaxMempoolTransactions() == 1
Expand All @@ -790,7 +792,7 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,
AccessList: realAccessList,
})
if err != nil {
return 0, err
return 0, fmt.Errorf("%w: %w", ErrNormalGasEstimationFailed, err)
}
return gas + config.ExtraBatchGas, nil
}
Expand Down Expand Up @@ -830,6 +832,8 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte,

const ethPosBlockTime = 12 * time.Second

var errAttemptLockFailed = errors.New("failed to acquire lock; either another batch poster posted a batch or this node fell behind")

func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) {
if b.batchReverted.Load() {
return false, fmt.Errorf("batch was reverted, not posting any more batches")
Expand Down Expand Up @@ -1006,6 +1010,18 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}

if b.daWriter != nil {
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}

gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
}
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
Expand Down Expand Up @@ -1147,25 +1163,49 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
batchPosterWalletBalance.Update(arbmath.BalancePerEther(walletBalance))
}
}
if !b.redisLock.AttemptLock(ctx) {
couldLock, err := b.redisLock.CouldAcquireLock(ctx)
if err != nil {
log.Warn("Error checking if we could acquire redis lock", "err", err)
// Might as well try, worst case we fail to lock
couldLock = true
}
if !couldLock {
log.Debug("Not posting batches right now because another batch poster has the lock or this node is behind")
b.building = nil
b.firstEphemeralError = time.Time{}
return b.config().PollInterval
}
posted, err := b.maybePostSequencerBatch(ctx)
if err == nil {
b.firstEphemeralError = time.Time{}
}
if err != nil {
if ctx.Err() != nil {
// Shutting down. No need to print the context canceled error.
return 0
}
b.building = nil
logLevel := log.Error
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstEphemeralError == (time.Time{}) {
b.firstEphemeralError = time.Now()
logLevel = log.Warn
} else if time.Since(b.firstEphemeralError) < time.Minute {
logLevel = log.Warn
} else if time.Since(b.firstEphemeralError) < time.Minute*5 && strings.Contains(err.Error(), "will exceed max mempool size") {
}
// Likely the inbox tracker just isn't caught up, or there's some other ephemeral error.
// Let's see if this error disappears naturally.
sinceFirstEphemeralError := time.Since(b.firstEphemeralError)
// If the error matches one of these, it's only logged at debug for the first minute,
// then at warn for the next 4 minutes, then at error. If the error isn't one of these,
// it'll be logged at warn for the first minute, then at error.
ignoreAtFirst := errors.Is(err, dataposter.ErrExceedsMaxMempoolSize) ||
errors.Is(err, storage.ErrStorageRace) ||
errors.Is(err, ErrNormalGasEstimationFailed) ||
errors.Is(err, AccumulatorNotFoundErr)
if sinceFirstEphemeralError < time.Minute {
if ignoreAtFirst {
logLevel = log.Debug
} else {
logLevel = log.Warn
}
} else if sinceFirstEphemeralError < time.Minute*5 && ignoreAtFirst {
logLevel = log.Warn
}
logLevel("error posting batch", "err", err)
Expand Down
16 changes: 4 additions & 12 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type DataPoster struct {
client arbutil.L1Interface
auth *bind.TransactOpts
signer signerFn
redisLock AttemptLocker
config ConfigFetcher
usingNoOpStorage bool
replacementTimes []time.Duration
Expand All @@ -84,10 +83,6 @@ type DataPoster struct {
// This can be local or external, hence the context parameter.
type signerFn func(context.Context, common.Address, *types.Transaction) (*types.Transaction, error)

type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func parseReplacementTimes(val string) ([]time.Duration, error) {
var res []time.Duration
var lastReplacementTime time.Duration
Expand All @@ -114,7 +109,6 @@ type DataPosterOpts struct {
HeaderReader *headerreader.HeaderReader
Auth *bind.TransactOpts
RedisClient redis.UniversalClient
RedisLock AttemptLocker
Config ConfigFetcher
MetadataRetriever func(ctx context.Context, blockNum *big.Int) ([]byte, error)
ExtraBacklog func() uint64
Expand Down Expand Up @@ -175,7 +169,6 @@ func NewDataPoster(ctx context.Context, opts *DataPosterOpts) (*DataPoster, erro
replacementTimes: replacementTimes,
metadataRetriever: opts.MetadataRetriever,
queue: queue,
redisLock: opts.RedisLock,
errorCount: make(map[uint64]int),
maxFeeCapExpression: expression,
extraBacklog: opts.ExtraBacklog,
Expand Down Expand Up @@ -288,6 +281,8 @@ func (p *DataPoster) MaxMempoolTransactions() uint64 {
return p.config().MaxMempoolTransactions
}

var ErrExceedsMaxMempoolSize = errors.New("posting this transaction will exceed max mempool size")

// Does basic check whether posting transaction with specified nonce would
// result in exceeding maximum queue length or maximum transactions in mempool.
func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) error {
Expand All @@ -310,7 +305,7 @@ func (p *DataPoster) canPostWithNonce(ctx context.Context, nextNonce uint64) err
return fmt.Errorf("getting nonce of a dataposter sender: %w", err)
}
if nextNonce >= cfg.MaxMempoolTransactions+unconfirmedNonce {
return fmt.Errorf("posting a transaction with nonce: %d will exceed max mempool size: %d, unconfirmed nonce: %d", nextNonce, cfg.MaxMempoolTransactions, unconfirmedNonce)
return fmt.Errorf("%w: transaction nonce: %d, unconfirmed nonce: %d, max mempool size: %d", ErrExceedsMaxMempoolSize, nextNonce, unconfirmedNonce, cfg.MaxMempoolTransactions)
}
}
return nil
Expand Down Expand Up @@ -533,7 +528,7 @@ func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Tim
return nil, err
}
if nonce != expectedNonce {
return nil, fmt.Errorf("data poster expected next transaction to have nonce %v but was requested to post transaction with nonce %v", expectedNonce, nonce)
return nil, fmt.Errorf("%w: data poster expected next transaction to have nonce %v but was requested to post transaction with nonce %v", storage.ErrStorageRace, expectedNonce, nonce)
}

err = p.updateBalance(ctx)
Expand Down Expand Up @@ -745,9 +740,6 @@ func (p *DataPoster) Start(ctxIn context.Context) {
p.CallIteratively(func(ctx context.Context) time.Duration {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.redisLock.AttemptLock(ctx) {
return minWait
}
err := p.updateBalance(ctx)
if err != nil {
log.Warn("failed to update tx poster balance", "err", err)
Expand Down
9 changes: 0 additions & 9 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
Expand Down Expand Up @@ -313,13 +312,6 @@ func StakerDataposter(
if err != nil {
return nil, fmt.Errorf("creating redis client from url: %w", err)
}
lockCfgFetcher := func() *redislock.SimpleCfg {
return &cfg.Staker.RedisLock
}
redisLock, err := redislock.NewSimple(redisC, lockCfgFetcher, func() bool { return syncMonitor.Synced() })
if err != nil {
return nil, err
}
dpCfg := func() *dataposter.DataPosterConfig {
return &cfg.Staker.DataPoster
}
Expand All @@ -335,7 +327,6 @@ func StakerDataposter(
HeaderReader: l1Reader,
Auth: transactOpts,
RedisClient: redisC,
RedisLock: redisLock,
Config: dpCfg,
MetadataRetriever: mdRetriever,
RedisKey: sender + ".staker-data-poster.queue",
Expand Down
26 changes: 25 additions & 1 deletion arbnode/redislock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Simple struct {
}

type SimpleCfg struct {
Enable bool `koanf:"enable"`
MyId string `koanf:"my-id"`
LockoutDuration time.Duration `koanf:"lockout-duration" reload:"hot"`
RefreshDuration time.Duration `koanf:"refresh-duration" reload:"hot"`
Expand All @@ -39,6 +40,7 @@ type SimpleCfg struct {
type SimpleCfgFetcher func() *SimpleCfg

func AddConfigOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultCfg.Enable, "if false, always treat this as locked and don't write the lock to redis")
f.String(prefix+".my-id", "", "this node's id prefix when acquiring the lock (optional)")
f.Duration(prefix+".lockout-duration", DefaultCfg.LockoutDuration, "how long lock is held")
f.Duration(prefix+".refresh-duration", DefaultCfg.RefreshDuration, "how long between consecutive calls to redis")
Expand All @@ -60,6 +62,7 @@ func NewSimple(client redis.UniversalClient, config SimpleCfgFetcher, readyToLoc
}

var DefaultCfg = SimpleCfg{
Enable: true,
LockoutDuration: time.Minute,
RefreshDuration: time.Second * 10,
Key: "",
Expand Down Expand Up @@ -137,12 +140,33 @@ func (l *Simple) AttemptLock(ctx context.Context) bool {
}

func (l *Simple) Locked() bool {
if l.client == nil {
if l.client == nil || !l.config().Enable {
return true
}
return time.Now().Before(atomicTimeRead(&l.lockedUntil))
}

// Returns true if a call to AttemptLock will likely succeed
func (l *Simple) CouldAcquireLock(ctx context.Context) (bool, error) {
if l.Locked() {
return true, nil
}
if l.stopping || !l.readyToLock() {
return false, nil
}
// l.client shouldn't be nil here because Locked would've returned true
current, err := l.client.Get(ctx, l.config().Key).Result()
if errors.Is(err, redis.Nil) {
// Lock is free for the taking
return true, nil
}
if err != nil {
return false, err
}
// return true if the lock is free for the taking or is already ours
return current == "" || current == l.myId, nil
}

func (l *Simple) Release(ctx context.Context) {
l.mutex.Lock()
defer l.mutex.Unlock()
Expand Down
1 change: 1 addition & 0 deletions arbnode/simple_redis_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo
Require(t, redisClient.Del(ctx, redisKey).Err())

conf := &redislock.SimpleCfg{
Enable: true,
LockoutDuration: test_delay * test_attempts * 10,
RefreshDuration: test_delay * 2,
Key: redisKey,
Expand Down
5 changes: 0 additions & 5 deletions staker/staker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
flag "github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/staker/txbuilder"
Expand Down Expand Up @@ -87,7 +86,6 @@ type L1ValidatorConfig struct {
GasRefunderAddress string `koanf:"gas-refunder-address"`
DataPoster dataposter.DataPosterConfig `koanf:"data-poster" reload:"hot"`
RedisUrl string `koanf:"redis-url"`
RedisLock redislock.SimpleCfg `koanf:"redis-lock" reload:"hot"`
ExtraGas uint64 `koanf:"extra-gas" reload:"hot"`
Dangerous DangerousConfig `koanf:"dangerous"`
ParentChainWallet genericconf.WalletConfig `koanf:"parent-chain-wallet"`
Expand Down Expand Up @@ -154,7 +152,6 @@ var DefaultL1ValidatorConfig = L1ValidatorConfig{
GasRefunderAddress: "",
DataPoster: dataposter.DefaultDataPosterConfigForValidator,
RedisUrl: "",
RedisLock: redislock.DefaultCfg,
ExtraGas: 50000,
Dangerous: DefaultDangerousConfig,
ParentChainWallet: DefaultValidatorL1WalletConfig,
Expand All @@ -175,7 +172,6 @@ var TestL1ValidatorConfig = L1ValidatorConfig{
GasRefunderAddress: "",
DataPoster: dataposter.TestDataPosterConfigForValidator,
RedisUrl: "",
RedisLock: redislock.DefaultCfg,
ExtraGas: 50000,
Dangerous: DefaultDangerousConfig,
ParentChainWallet: DefaultValidatorL1WalletConfig,
Expand Down Expand Up @@ -205,7 +201,6 @@ func L1ValidatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".redis-url", DefaultL1ValidatorConfig.RedisUrl, "redis url for L1 validator")
f.Uint64(prefix+".extra-gas", DefaultL1ValidatorConfig.ExtraGas, "use this much more gas than estimation says is necessary to post transactions")
dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfigForValidator)
redislock.AddConfigOptions(prefix+".redis-lock", f)
DangerousConfigAddOptions(prefix+".dangerous", f)
genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultL1ValidatorConfig.ParentChainWallet.Pathname)
}
Expand Down

0 comments on commit f546581

Please sign in to comment.