Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(op-node): pre-fetch receipts concurrently #100

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch L1 config of L2 block %s: %w", pipelineL2.ID(), err))
}
err2 := eq.l1Fetcher.GoOrUpdatePreFetchReceipts(ctx, pipelineOrigin.Number)
err2 := eq.l1Fetcher.GoOrUpdatePreFetchReceipts(context.Background(), pipelineOrigin.Number)
if err2 != nil {
return NewTemporaryError(fmt.Errorf("failed to run pre fetch L1 receipts for L1 start block %s: %w", pipelineOrigin.ID(), err2))
}
Expand Down
21 changes: 16 additions & 5 deletions op-node/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/time/rate"

"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/eth"
Expand Down Expand Up @@ -61,6 +62,8 @@ type L1Client struct {

//ensure pre-fetch receipts only once
preFetchReceiptsOnce sync.Once
//control the number of concurrent pre-fetch receipt requests.
preFetchReceiptsRateLimiter *rate.Limiter
//start block for pre-fetch receipts
preFetchReceiptsStartBlockChan chan uint64
//done chan
Expand All @@ -79,6 +82,7 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con
l1BlockRefsCache: caching.NewLRUCache(metrics, "blockrefs", config.L1BlockRefsCacheSize),
preFetchReceiptsOnce: sync.Once{},
preFetchReceiptsStartBlockChan: make(chan uint64, 1),
preFetchReceiptsRateLimiter: rate.NewLimiter(rate.Limit(config.MaxConcurrentRequests), config.MaxConcurrentRequests),
done: make(chan struct{}),
}, nil
}
Expand Down Expand Up @@ -151,13 +155,20 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
time.Sleep(3 * time.Second)
andyzhang2023 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
_, _, err = s.FetchReceipts(ctx, blockInfo.Hash)
if err != nil {
s.log.Warn("failed to pre-fetch receipts", "err", err)
time.Sleep(200 * time.Millisecond)
waitErr := s.preFetchReceiptsRateLimiter.Wait(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a better way to put this ratelimit.wait before L1BlockRefByNumber call?
After triggering the rate limiting threshold, can reduce some unnecessary L1BlockRefByNumber calls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requests for L1BlockRefByNumber should all be necessary:
When we have not reached the latest block height, the request parameters for L1BlockRefByNumber are different every time, and the results obtained are useful.
When we reach the latest block height, continuously requesting L1BlockRefByNumber allows us to process subsequent processes as soon as a new block height appears. If we add a limiter here, we will still need to wait for a period of time before entering the subsequent process when a new block height appears.

if waitErr != nil {
s.log.Warn("failed to wait pre-fetch receipts rateLimiter", "err", waitErr)
continue
}
s.log.Debug("pre-fetching receipts", "block", currentL1Block)

go func(ctx context.Context, blockInfo eth.L1BlockRef) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put the line 152 "L1BlockRefByNumber" here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when we reach the latest block height, L1BlockRefByNumber also has the effect of making our processing process wait for a period of time. We assume that if we parallelize the process of L1BlockRefByNumber, then when we reach the latest block height, the processing process will not stop, and it will continue to launch new goroutines to try to process blocks that have not yet been generated.
On the other hand, the performance of the L1BlockRefByNumber interface is not so bad, and it also has its own cache, so there is no need to parallelize the processing.

_, _, err = s.FetchReceipts(ctx, blockInfo.Hash)
if err != nil {
s.log.Warn("failed to pre-fetch receipts", "err", err)
return
}
s.log.Debug("pre-fetching receipts", "block", currentL1Block)
}(ctx, blockInfo)
currentL1Block = currentL1Block + 1
}
}
Expand Down
Loading