From 7876f5b7adf5aeae463899953c3b6d579915a976 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 17 Jan 2024 17:57:56 +0530 Subject: [PATCH 1/2] Add an option for the inbox reader to only read safe or finalized L1 blocks --- arbnode/inbox_reader.go | 102 ++++++++++++++++++++++++++++------------ arbnode/node.go | 7 +++ 2 files changed, 80 insertions(+), 29 deletions(-) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index 9c830e3c89..f452b0d890 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -31,6 +31,7 @@ type InboxReaderConfig struct { DefaultBlocksToRead uint64 `koanf:"default-blocks-to-read" reload:"hot"` TargetMessagesRead uint64 `koanf:"target-messages-read" reload:"hot"` MaxBlocksToRead uint64 `koanf:"max-blocks-to-read" reload:"hot"` + ReadMode string `koanf:"read-mode" reload:"hot"` } type InboxReaderConfigFetcher func() *InboxReaderConfig @@ -39,6 +40,12 @@ func (c *InboxReaderConfig) Validate() error { if c.MaxBlocksToRead == 0 || c.MaxBlocksToRead < c.DefaultBlocksToRead { return errors.New("inbox reader max-blocks-to-read cannot be zero or less than default-blocks-to-read") } + if c.ReadMode != "latest" { + c.ReadMode = strings.ToLower(c.ReadMode) + if c.ReadMode != "safe" && c.ReadMode != "finalized" { + return fmt.Errorf("inbox reader read-mode is invalid, want: safe or finalized, got: %s", c.ReadMode) + } + } return nil } @@ -50,6 +57,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)") f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once") + f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read safe or finalized L1 blocks. Takes string input, valid strings- safe, finalized") } var DefaultInboxReaderConfig = InboxReaderConfig{ @@ -60,6 +68,7 @@ var DefaultInboxReaderConfig = InboxReaderConfig{ DefaultBlocksToRead: 100, TargetMessagesRead: 500, MaxBlocksToRead: 2000, + ReadMode: "latest", } var TestInboxReaderConfig = InboxReaderConfig{ @@ -70,6 +79,7 @@ var TestInboxReaderConfig = InboxReaderConfig{ DefaultBlocksToRead: 100, TargetMessagesRead: 500, MaxBlocksToRead: 2000, + ReadMode: "latest", } type InboxReader struct { @@ -218,6 +228,7 @@ func (r *InboxReader) CaughtUp() chan struct{} { } func (r *InboxReader) run(ctx context.Context, hadError bool) error { + readMode := r.config().ReadMode from, err := r.getNextBlockToRead() if err != nil { return err @@ -238,38 +249,71 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { } defer storeSeenBatchCount() // in case of error for { - - latestHeader, err := r.l1Reader.LastHeader(ctx) - if err != nil { - return err - } config := r.config() - currentHeight := latestHeader.Number - - neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1) - neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance) - checkDelayTimer := time.NewTimer(config.CheckDelay) - WaitForHeight: - for arbmath.BigLessThan(currentHeight, neededBlockHeight) { - select { - case latestHeader = <-newHeaders: - if latestHeader == nil { - // shutting down + currentHeight := big.NewInt(0) + if readMode != "latest" { + var blockNum uint64 + fetchLatestSafeOrFinalized := func() { + if readMode == "safe" { + blockNum, err = r.l1Reader.LatestSafeBlockNr(ctx) + } else { + blockNum, err = r.l1Reader.LatestFinalizedBlockNr(ctx) + } + } + fetchLatestSafeOrFinalized() + if err != nil || blockNum == 0 { + return fmt.Errorf("inboxreader running in read only %s mode and unable to fetch latest %s block. err: %w", readMode, readMode, err) + } + currentHeight.SetUint64(blockNum) + // latest block in our db is newer than the latest safe/finalized block hence reset 'from' to match the last safe/finalized block number + if from.Uint64() > currentHeight.Uint64()+1 { + from.Set(currentHeight) + } + for currentHeight.Cmp(from) <= 0 { + select { + case <-newHeaders: + fetchLatestSafeOrFinalized() + if err != nil || blockNum == 0 { + return fmt.Errorf("inboxreader waiting for recent %s block and unable to fetch its block number. err: %w", readMode, err) + } + currentHeight.SetUint64(blockNum) + case <-ctx.Done(): return nil } - currentHeight = new(big.Int).Set(latestHeader.Number) - case <-ctx.Done(): - return nil - case <-checkDelayTimer.C: - break WaitForHeight } - } - checkDelayTimer.Stop() + } else { + + latestHeader, err := r.l1Reader.LastHeader(ctx) + if err != nil { + return err + } + currentHeight = latestHeader.Number + + neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1) + neededBlockHeight := arbmath.BigAddByUint(from, neededBlockAdvance) + checkDelayTimer := time.NewTimer(config.CheckDelay) + WaitForHeight: + for arbmath.BigLessThan(currentHeight, neededBlockHeight) { + select { + case latestHeader = <-newHeaders: + if latestHeader == nil { + // shutting down + return nil + } + currentHeight = new(big.Int).Set(latestHeader.Number) + case <-ctx.Done(): + return nil + case <-checkDelayTimer.C: + break WaitForHeight + } + } + checkDelayTimer.Stop() - if config.DelayBlocks > 0 { - currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks)) - if currentHeight.Cmp(r.firstMessageBlock) < 0 { - currentHeight = new(big.Int).Set(r.firstMessageBlock) + if config.DelayBlocks > 0 { + currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks)) + if currentHeight.Cmp(r.firstMessageBlock) < 0 { + currentHeight = new(big.Int).Set(r.firstMessageBlock) + } } } @@ -358,7 +402,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { r.lastReadBatchCount = checkingBatchCount r.lastReadMutex.Unlock() storeSeenBatchCount() - if !r.caughtUp { + if !r.caughtUp && readMode == "latest" { r.caughtUp = true close(r.caughtUpChan) } @@ -406,7 +450,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error { if err != nil { return err } - if !r.caughtUp && to.Cmp(currentHeight) == 0 { + if !r.caughtUp && to.Cmp(currentHeight) == 0 && readMode == "latest" { r.caughtUp = true close(r.caughtUpChan) } diff --git a/arbnode/node.go b/arbnode/node.go index f92dcefe7c..55bd69118d 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -99,6 +99,13 @@ func (c *Config) Validate() error { if c.DelayedSequencer.Enable && !c.Sequencer { return errors.New("cannot enable delayed sequencer without enabling sequencer") } + if c.InboxReader.ReadMode != "latest" { + if c.Sequencer { + return errors.New("cannot enable inboxreader in safe or finalized mode along with sequencer") + } + c.Feed.Output.Enable = false + c.Feed.Input.URL = []string{} + } if err := c.BlockValidator.Validate(); err != nil { return err } From 3bce233ceed826f8ea9e1aed18bde34271721efd Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 6 Feb 2024 14:36:04 -0600 Subject: [PATCH 2/2] address PR comments --- arbnode/inbox_reader.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index f452b0d890..7b6b96d43b 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -40,11 +40,9 @@ func (c *InboxReaderConfig) Validate() error { if c.MaxBlocksToRead == 0 || c.MaxBlocksToRead < c.DefaultBlocksToRead { return errors.New("inbox reader max-blocks-to-read cannot be zero or less than default-blocks-to-read") } - if c.ReadMode != "latest" { - c.ReadMode = strings.ToLower(c.ReadMode) - if c.ReadMode != "safe" && c.ReadMode != "finalized" { - return fmt.Errorf("inbox reader read-mode is invalid, want: safe or finalized, got: %s", c.ReadMode) - } + c.ReadMode = strings.ToLower(c.ReadMode) + if c.ReadMode != "latest" && c.ReadMode != "safe" && c.ReadMode != "finalized" { + return fmt.Errorf("inbox reader read-mode is invalid, want: latest or safe or finalized, got: %s", c.ReadMode) } return nil } @@ -57,7 +55,7 @@ func InboxReaderConfigAddOptions(prefix string, f *flag.FlagSet) { f.Uint64(prefix+".default-blocks-to-read", DefaultInboxReaderConfig.DefaultBlocksToRead, "the default number of blocks to read at once (will vary based on traffic by default)") f.Uint64(prefix+".target-messages-read", DefaultInboxReaderConfig.TargetMessagesRead, "if adjust-blocks-to-read is enabled, the target number of messages to read at once") f.Uint64(prefix+".max-blocks-to-read", DefaultInboxReaderConfig.MaxBlocksToRead, "if adjust-blocks-to-read is enabled, the maximum number of blocks to read at once") - f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read safe or finalized L1 blocks. Takes string input, valid strings- safe, finalized") + f.String(prefix+".read-mode", DefaultInboxReaderConfig.ReadMode, "mode to only read latest or safe or finalized L1 blocks. Enabling safe or finalized disables feed input and output. Defaults to latest. Takes string input, valid strings- latest, safe, finalized") } var DefaultInboxReaderConfig = InboxReaderConfig{