Skip to content

Commit

Permalink
feat: add more metrics to investigate tx broadcast issue (#1007)
Browse files Browse the repository at this point in the history
* update eth/fetcher/tx_fetcher.go

* update eth/handler.go

* update eth/handler.go

* update eth/protocols/eth/peer.go

* update eth/protocols/eth/broadcast.go

* update eth/protocols/eth/handlers.go

* chore: auto version bump [bot]

* fix

---------

Co-authored-by: HAOYUatHZ <[email protected]>
Co-authored-by: Péter Garamvölgyi <[email protected]>
  • Loading branch information
3 people committed Aug 27, 2024
1 parent 77a5659 commit 118a80b
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 1 deletion.
5 changes: 5 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ var (
txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)

peerAnnounceTxsLenGauge = metrics.NewRegisteredGauge("eth/fetcher/peer/announce/txs", nil)
peerRetrievalTxsLenGauge = metrics.NewRegisteredGauge("eth/fetcher/peer/retrieval/txs", nil)
)

// txAnnounce is the notification of the availability of a batch
Expand Down Expand Up @@ -792,6 +795,8 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
})

log.Debug("Scheduling transaction retrieval", "peer", peer, "len(f.announces[peer])", len(f.announces[peer]), "len(hashes)", len(hashes))
peerAnnounceTxsLenGauge.Update(int64(len(f.announces[peer])))
peerRetrievalTxsLenGauge.Update(int64(len(hashes)))

// If any hashes were allocated, request them from the peer
if len(hashes) > 0 {
Expand Down
13 changes: 13 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ import (
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/p2p"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/trie"
)

var (
annoTxsLenGauge = metrics.NewRegisteredGauge("eth/handler/broadast/announce/txs", nil)
directTxsLenGauge = metrics.NewRegisteredGauge("eth/handler/broadast/direct/txs", nil)
)

const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
Expand Down Expand Up @@ -523,6 +529,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
"tx packs", directPeers, "broadcast txs", directCount)

if directPeers > 0 {
directTxsLenGauge.Update(int64(directCount / directPeers))
}
if annoPeers > 0 {
annoTxsLenGauge.Update(int64(annoCount / annoPeers))
}
}

// minedBroadcastLoop sends mined blocks to connected peers.
Expand Down
20 changes: 20 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import (
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
)

var (
broadcastSendTxsLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/txs", nil)
broadcastSendTxsFailGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/fail", nil)
broadcastSendHashesLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/hashes", nil)
broadcastSendQueueLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/direct/queue", nil)
broadcastAnnoTxsLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/txs", nil)
broadcastAnnoTxsFailGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/fail", nil)
broadcastAnnoHashesLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/hashes", nil)
broadcastAnnoQueueLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/broadcast/anno/queue", nil)
)

const (
Expand Down Expand Up @@ -94,8 +106,10 @@ func (p *Peer) broadcastTransactions() {
done = make(chan struct{})
go func() {
log.Debug("Sending transactions", "count", len(txs))
broadcastSendTxsLenGauge.Update(int64(len(txs)))
if err := p.SendTransactions(txs); err != nil {
log.Debug("Sending transactions", "count", len(txs), "err", err)
broadcastSendTxsFailGauge.Inc(1)
fail <- err
return
}
Expand All @@ -115,6 +129,8 @@ func (p *Peer) broadcastTransactions() {
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
log.Debug("Queue size in broadcastTransactions", "len(hashes)", len(hashes), "len(queue)", len(queue), "maxQueuedTxs", maxQueuedTxs)
broadcastSendHashesLenGauge.Update(int64(len(hashes)))
broadcastSendQueueLenGauge.Update(int64(len(queue)))
if len(queue) > maxQueuedTxs {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
Expand Down Expand Up @@ -165,8 +181,10 @@ func (p *Peer) announceTransactions() {
done = make(chan struct{})
go func() {
log.Debug("Sending transaction announcements", "count", len(pending))
broadcastAnnoTxsLenGauge.Update(int64(len(pending)))
if err := p.sendPooledTransactionHashes(pending); err != nil {
log.Debug("Sending transaction announcements", "count", len(pending), "err", err)
broadcastAnnoTxsFailGauge.Inc(1)
fail <- err
return
}
Expand All @@ -186,6 +204,8 @@ func (p *Peer) announceTransactions() {
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
log.Debug("Queue size in announceTransactions", "len(hashes)", len(hashes), "len(queue)", len(queue), "maxQueuedTxAnns", maxQueuedTxAnns)
broadcastAnnoHashesLenGauge.Update(int64(len(hashes)))
broadcastAnnoQueueLenGauge.Update(int64(len(queue)))
if len(queue) > maxQueuedTxAnns {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
Expand Down
27 changes: 27 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,25 @@ import (
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/rlp"
"github.com/scroll-tech/go-ethereum/trie"
)

var (
newPooledTxHashesFailGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/newpooledtxhashes/fail", nil)
newPooledTxHashesLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/newpooledtxhashes/len", nil)
getPooledTxsFailGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/getpooledtxs/fail", nil)
getPooledTxsQueryLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/getpooledtxs/query", nil)
getPooledTxsRetrievedLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/getpooledtxs/retrieved", nil)
handleTxsFailGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/handletxs/fail", nil)
handleTxsLenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/handletxs/len", nil)
handleTxsNilGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/handletxs/nil", nil)
pooledTxs66FailGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/pooledtxs66/fail", nil)
pooledTxs66LenGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/pooledtxs66/len", nil)
pooledTxs66NillGauge = metrics.NewRegisteredGauge("eth/protocols/eth/handlers/pooledtxs66/nil", nil)
)

// handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders
func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query
Expand Down Expand Up @@ -324,10 +339,12 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
ann := new(NewPooledTransactionHashesPacket)
if err := msg.Decode(ann); err != nil {
log.Debug("Failed to decode `NewPooledTransactionHashesPacket`", "peer", peer.String(), "err", err)
newPooledTxHashesFailGauge.Inc(1)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
// Schedule all the unknown hashes for retrieval
log.Debug("handleNewPooledTransactionHashes", "peer", peer.String(), "len(ann)", len(*ann))
newPooledTxHashesLenGauge.Update(int64(len(*ann)))
for _, hash := range *ann {
peer.markTransaction(hash)
}
Expand All @@ -339,10 +356,13 @@ func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) err
var query GetPooledTransactionsPacket66
if err := msg.Decode(&query); err != nil {
log.Debug("Failed to decode `GetPooledTransactionsPacket66`", "peer", peer.String(), "err", err)
getPooledTxsFailGauge.Inc(1)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsPacket, peer)
log.Debug("handleGetPooledTransactions", "peer", peer.String(), "RequestId", query.RequestId, "len(query)", len(query.GetPooledTransactionsPacket), "retrieved", len(hashes))
getPooledTxsQueryLenGauge.Update(int64(len(query.GetPooledTransactionsPacket)))
getPooledTxsRetrievedLenGauge.Update(int64(len(hashes)))
return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs)
}

Expand Down Expand Up @@ -382,13 +402,16 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions can be processed, parse all of them and deliver to the pool
var txs TransactionsPacket
if err := msg.Decode(&txs); err != nil {
handleTxsFailGauge.Inc(1)
log.Debug("Failed to decode `TransactionsPacket`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
log.Debug("handleTransactions", "peer", peer.String(), "len(txs)", len(txs))
handleTxsLenGauge.Update(int64(len(txs)))
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
handleTxsNilGauge.Inc(1)
log.Debug("handleTransactions: transaction is nil", "peer", peer.String(), "i", i)
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
Expand All @@ -405,12 +428,16 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
// Transactions can be processed, parse all of them and deliver to the pool
var txs PooledTransactionsPacket66
if err := msg.Decode(&txs); err != nil {
pooledTxs66FailGauge.Inc(1)
log.Debug("Failed to decode `PooledTransactionsPacket66`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
log.Debug("handlePooledTransactions66", "peer", peer.String(), "len(txs)", len(txs.PooledTransactionsPacket))
pooledTxs66LenGauge.Update(int64(len(txs.PooledTransactionsPacket)))
for i, tx := range txs.PooledTransactionsPacket {
// Validate and mark the remote transaction
if tx == nil {
pooledTxs66NillGauge.Inc(1)
log.Debug("handlePooledTransactions: transaction is nil", "peer", peer.String(), "i", i)
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
Expand Down
6 changes: 6 additions & 0 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ import (
"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/metrics"
"github.com/scroll-tech/go-ethereum/p2p"
"github.com/scroll-tech/go-ethereum/rlp"
)

var (
peerRequestTxsCntGauge = metrics.NewRegisteredGauge("eth/protocols/eth/peer/request/txs", nil)
)

const (
// maxKnownTxs is the maximum transactions hashes to keep in the known list
// before starting to randomly evict them.
Expand Down Expand Up @@ -421,6 +426,7 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
id := rand.Uint64()

log.Debug("Requesting transactions", "RequestId", id, "Peer.id", p.id, "count", len(hashes))
peerRequestTxsCntGauge.Update(int64(len(hashes)))

requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 7 // Minor version component of the current release
VersionPatch = 2 // Patch version component of the current release
VersionPatch = 3 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down

0 comments on commit 118a80b

Please sign in to comment.