diff --git a/cmd/cdk-erigon/main.go b/cmd/cdk-erigon/main.go index 296c975f9b1..d84c9b83c43 100644 --- a/cmd/cdk-erigon/main.go +++ b/cmd/cdk-erigon/main.go @@ -18,8 +18,8 @@ import ( "github.com/ledgerwatch/erigon/params" erigonapp "github.com/ledgerwatch/erigon/turbo/app" erigoncli "github.com/ledgerwatch/erigon/turbo/cli" - "github.com/ledgerwatch/erigon/turbo/node" "github.com/ledgerwatch/erigon/turbo/logging" + "github.com/ledgerwatch/erigon/turbo/node" ) func main() { @@ -63,7 +63,7 @@ func runErigon(cliCtx *cli.Context) error { ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg) // Init for X Layer - initRunForXLayer(ethCfg) + initRunForXLayer(cliCtx, ethCfg) ethNode, err := node.New(nodeCfg, ethCfg) if err != nil { diff --git a/cmd/cdk-erigon/run_xlayer.go b/cmd/cdk-erigon/run_xlayer.go index 946cb1dab2a..f9639ea7e52 100644 --- a/cmd/cdk-erigon/run_xlayer.go +++ b/cmd/cdk-erigon/run_xlayer.go @@ -1,14 +1,22 @@ package main import ( + "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/zk/apollo" + "github.com/ledgerwatch/erigon/zk/metrics" "github.com/ledgerwatch/log/v3" + "github.com/urfave/cli/v2" ) -func initRunForXLayer(ethCfg *ethconfig.Config) { +func initRunForXLayer(cliCtx *cli.Context, ethCfg *ethconfig.Config) { apolloClient := apollo.NewClient(ethCfg) if apolloClient.LoadConfig() { log.Info("Apollo config loaded") } + + // Init metrics + if cliCtx.Bool(utils.MetricsEnabledFlag.Name) { + metrics.Init() + } } diff --git a/metrics/exp/exp.go b/metrics/exp/exp.go index 763dc41efe5..1ed6ae7f7dd 100644 --- a/metrics/exp/exp.go +++ b/metrics/exp/exp.go @@ -6,24 +6,24 @@ import ( "fmt" "net/http" - metrics2 "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/metrics" "github.com/ledgerwatch/log/v3" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // Setup starts a dedicated metrics server at the given address. // This function enables metrics reporting separate from pprof. func Setup(address string) { - http.HandleFunc("/debug/metrics/prometheus", func(w http.ResponseWriter, r *http.Request) { + // For X Layer + mux := http.NewServeMux() + mux.Handle("/debug/metrics/prometheus", promhttp.Handler()) + mux.Handle("/debug/metrics", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - metrics2.WritePrometheus(w, true) - }) - //m.Handle("/debug/metrics", ExpHandler(metrics.DefaultRegistry)) - //m.Handle("/debug/metrics/prometheus2", promhttp.HandlerFor(prometheus2.DefaultGatherer, promhttp.HandlerOpts{ - // EnableOpenMetrics: true, - //})) - log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%s/debug/metrics/prometheus", address)) + metrics.WritePrometheus(w, true) + })) + log.Info("Starting metrics server", "addr", fmt.Sprintf("http://%v/debug/metrics/prometheus or http://%v/debug/metrics", address, address)) go func() { - if err := http.ListenAndServe(address, nil); err != nil { // nolint:gosec + if err := http.ListenAndServe(address, mux); err != nil { // nolint:gosec log.Error("Failure in running metrics server", "err", err) } }() diff --git a/test/config/test.erigon.rpc.config.yaml b/test/config/test.erigon.rpc.config.yaml index dce424d9534..fba6c7c9061 100644 --- a/test/config/test.erigon.rpc.config.yaml +++ b/test/config/test.erigon.rpc.config.yaml @@ -22,7 +22,7 @@ zkevm.l1-highest-block-type: latest zkevm.rpc-ratelimit: 300 zkevm.datastream-version: 2 -log.console.verbosity: 4 +log.console.verbosity: info #zkevm.executor-urls: erigon-stateless-executor:50071 zkevm.executor-urls: "" @@ -46,7 +46,25 @@ http.addr: 0.0.0.0 http.port: 8545 http.vhosts: any http.corsdomain: any +http.timeouts.read: "10s" +http.timeouts.write: "10s" +http.timeouts.idle: "10s" ws: true + +zkevm.apollo-enabled: false +zkevm.apollo-ip-addr: "" +zkevm.apollo-app-id: "" +zkevm.apollo-namespace-name: "" + +zkevm.nacos-urls: "" +zkevm.nacos-namespace-id: "" +zkevm.nacos-application-name: "" +zkevm.nacos-external-listen-addr: "" + +metrics: true +metrics.addr: "0.0.0.0" +metrics.port: 9095 + db.read.concurrency: 20000 txpool.globalslots: 100000 networkid: 195 @@ -59,4 +77,5 @@ http.methodratelimit: "{\"methods\":[\"eth_syncing\"],\"count\":10,\"bucket\":1} pprof: true pprof.port: 6060 -pprof.addr: 127.0.0.1 \ No newline at end of file +pprof.addr: 127.0.0.1 + diff --git a/test/config/test.erigon.seq.config.yaml b/test/config/test.erigon.seq.config.yaml index 20acb38cb01..04d37d02b61 100644 --- a/test/config/test.erigon.seq.config.yaml +++ b/test/config/test.erigon.seq.config.yaml @@ -22,7 +22,7 @@ zkevm.l1-query-delay: 6000 zkevm.rpc-ratelimit: 300 zkevm.datastream-version: 2 -log.console.verbosity: 4 +log.console.verbosity: info #zkevm.executor-urls: erigon-stateless-executor:50071 zkevm.executor-urls: "" @@ -38,7 +38,7 @@ zkevm.data-stream-port: 6900 zkevm.default-gas-price: 1000000000 zkevm.max-gas-price: 0 zkevm.gas-price-factor: 0.000001 -#zkevm.gasless: true +zkevm.gasless: true externalcl: true http.api: [eth, debug, net, trace, web3, erigon, txpool, zkevm] @@ -46,8 +46,9 @@ http.addr: 0.0.0.0 http.port: 8545 http.vhosts: any http.corsdomain: any -http.timeouts.read: "60s" -http.timeouts.write: "60s" +http.timeouts.read: "10s" +http.timeouts.write: "10s" +http.timeouts.idle: "10s" rpc.batch.concurrency: 2 rpc.batch.limit: 20 ws: true @@ -62,6 +63,10 @@ zkevm.nacos-namespace-id: "" zkevm.nacos-application-name: "" zkevm.nacos-external-listen-addr: "" +metrics: true +metrics.addr: "0.0.0.0" +metrics.port: 9095 + db.read.concurrency: 20000 txpool.globalslots: 100000 txpool.globalbasefeeslots: 100000 @@ -73,7 +78,7 @@ txpool.enablefreegasbynonce : true txpool.freegascountperaddr : 100 gpo.type: "follower" -gpo.update-period: "2s" +gpo.update-period: "60s" gpo.factor: 0.01 gpo.kafka-url: "0.0.0.0" gpo.topic: "explorer" diff --git a/test/config/test.poolmanager.toml b/test/config/test.poolmanager.toml index d388b11ff40..53dea65b0e0 100644 --- a/test/config/test.poolmanager.toml +++ b/test/config/test.poolmanager.toml @@ -8,7 +8,7 @@ Host = "0.0.0.0" Port = 8545 ReadTimeout = "60s" WriteTimeout = "60s" -MaxRequestsPerIPAndSecond = 500 +MaxRequestsPerIPAndSecond = 50000 EnableHttpLog = true BatchRequestsEnabled = false BatchRequestsLimit = 20 diff --git a/test/docker-compose.yml b/test/docker-compose.yml index cfbb0343af9..553681f270a 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -117,6 +117,7 @@ services: ports: - 8123:8545 - 6900:6900 + - 9092:9095 volumes: - ./config/test.erigon.seq.config.yaml:/usr/src/app/config.yaml - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json @@ -133,6 +134,7 @@ services: ports: - 8124:8545 - 6901:6900 + - 9091:9095 volumes: - ./config/test.erigon.rpc.config.yaml:/usr/src/app/config.yaml - ./config/dynamic-mynetwork-allocs.json:/usr/src/app/dynamic-mynetwork-allocs.json diff --git a/test/e2e/smoke_test.go b/test/e2e/smoke_test.go index 8e810578bfa..0036d4aa62f 100644 --- a/test/e2e/smoke_test.go +++ b/test/e2e/smoke_test.go @@ -361,6 +361,19 @@ func TestGasPrice(t *testing.T) { require.Greater(t, gasPrice2, gasPrice1) } +func TestMetrics(t *testing.T) { + result, err := operations.GetMetricsPrometheus() + require.NoError(t, err) + require.Equal(t, strings.Contains(result, "sequencer_batch_execute_time"), true) + require.Equal(t, strings.Contains(result, "sequencer_pool_tx_count"), true) + + result, err = operations.GetMetrics() + require.NoError(t, err) + require.Equal(t, strings.Contains(result, "zkevm_getBatchWitness"), true) + require.Equal(t, strings.Contains(result, "eth_sendRawTransaction"), true) + require.Equal(t, strings.Contains(result, "eth_getTransactionCount"), true) +} + func transToken(t *testing.T, ctx context.Context, client *ethclient.Client, amount *uint256.Int, toAddress string) string { auth, err := operations.GetAuth(operations.DefaultL2AdminPrivateKey, operations.DefaultL2ChainID) nonce, err := client.PendingNonceAt(ctx, auth.From) diff --git a/test/operations/manager.go b/test/operations/manager.go index 36b938c953c..81cb74e0903 100644 --- a/test/operations/manager.go +++ b/test/operations/manager.go @@ -26,6 +26,9 @@ const ( DefaultL2NetworkURL = "http://localhost:8124" DefaultL2ChainID uint64 = 195 + DefaultL2MetricsPrometheusURL = "http://127.0.0.1:9092/debug/metrics/prometheus" + DefaultL2MetricsURL = "http://127.0.0.1:9092/debug/metrics" + BridgeAddr = "0x1089Af36bD72553008FAd0A1240B4D5641208494" DefaultTimeoutTxToBeMined = 1 * time.Minute diff --git a/test/operations/rpc_client.go b/test/operations/rpc_client.go index 32052e04028..6d16e84ebd0 100644 --- a/test/operations/rpc_client.go +++ b/test/operations/rpc_client.go @@ -3,8 +3,11 @@ package operations import ( "encoding/json" "fmt" + "io/ioutil" "math/big" + "net/http" "strconv" + "time" "github.com/gateway-fm/cdk-erigon-lib/common" types "github.com/ledgerwatch/erigon/zk/rpcdaemon" @@ -223,3 +226,37 @@ func GetMinGasPrice() (uint64, error) { return transHexToUint64(response.Result) } + +func GetMetricsPrometheus() (string, error) { + client := http.Client{ + Timeout: 10 * time.Second, + } + resp, err := client.Get(DefaultL2MetricsPrometheusURL) + if err != nil { + fmt.Println("Error:", err) + return "", err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} + +func GetMetrics() (string, error) { + client := http.Client{ + Timeout: 10 * time.Second, + } + resp, err := client.Get(DefaultL2MetricsURL) + if err != nil { + fmt.Println("Error:", err) + return "", err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} diff --git a/test/readme.md b/test/readme.md index b332efc7773..5849484e6bc 100644 --- a/test/readme.md +++ b/test/readme.md @@ -30,3 +30,9 @@ L2 WETH Token: 0x5d7AF92af4FF5a35323250D6ee174C23CCBe00EF L2 admin: 0x8f8E2d6cF621f30e9a11309D6A56A876281Fd534 ``` + +# Get metrics +``` +curl http://127.0.0.1:9092/debug/metrics/prometheus +curl http://127.0.0.1:9092/debug/metrics +``` diff --git a/zk/metrics/metrics_xlayer.go b/zk/metrics/metrics_xlayer.go new file mode 100644 index 00000000000..e9ed83f7a9c --- /dev/null +++ b/zk/metrics/metrics_xlayer.go @@ -0,0 +1,56 @@ +package metrics + +import ( + "fmt" + "time" + + "github.com/ledgerwatch/log/v3" + "github.com/prometheus/client_golang/prometheus" +) + +type BatchFinalizeType string + +const ( + BatchTimeOut BatchFinalizeType = "EmptyBatchTimeOut" + BatchCounterOverflow BatchFinalizeType = "BatchCounterOverflow" + BatchLimboRecovery BatchFinalizeType = "LimboRecovery" +) + +var ( + SeqPrefix = "sequencer_" + BatchExecuteTimeName = SeqPrefix + "batch_execute_time" + PoolTxCountName = SeqPrefix + "pool_tx_count" +) + +func Init() { + prometheus.MustRegister(BatchExecuteTimeGauge) + prometheus.MustRegister(PoolTxCount) +} + +var BatchExecuteTimeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: BatchExecuteTimeName, + Help: "[SEQUENCER] batch execution time in second", + }, + []string{"closingReason"}, +) + +var PoolTxCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: PoolTxCountName, + Help: "[SEQUENCER] tx count of each pool in tx pool", + }, + []string{"poolName"}, +) + +func BatchExecuteTime(closingReason string, duration time.Duration) { + log.Info(fmt.Sprintf("[BatchExecuteTime] ClosingReason: %v, Duration: %.2fs", closingReason, duration.Seconds())) + BatchExecuteTimeGauge.WithLabelValues(closingReason).Set(duration.Seconds()) +} + +func AddPoolTxCount(pending, baseFee, queued int) { + log.Info(fmt.Sprintf("[PoolTxCount] pending: %v, basefee: %v, queued: %v", pending, baseFee, queued)) + PoolTxCount.WithLabelValues("pending").Set(float64(pending)) + PoolTxCount.WithLabelValues("basefee").Set(float64(baseFee)) + PoolTxCount.WithLabelValues("queued").Set(float64(queued)) +} diff --git a/zk/metrics/statistics_xlayer.go b/zk/metrics/statistics_xlayer.go new file mode 100644 index 00000000000..3acbcdc64c2 --- /dev/null +++ b/zk/metrics/statistics_xlayer.go @@ -0,0 +1,37 @@ +package metrics + +import ( + "time" +) + +type logTag string + +const ( + BlockCounter logTag = "BlockCounter" + TxCounter logTag = "TxCounter" + GetTx logTag = "GetTx" + GetTxPauseCounter logTag = "GetTxPauseCounter" + GetTxPauseTiming logTag = "GetTxPauseTiming" + BatchCloseReason logTag = "BatchCloseReason" + ReprocessingTxCounter logTag = "ReProcessingTxCounter" + FailTxCounter logTag = "FailTxCounter" + FailTxResourceOverCounter logTag = "FailTxResourceOverCounter" + BatchGas logTag = "BatchGas" + ProcessingTxTiming logTag = "ProcessingTxTiming" + ProcessingInvalidTxCounter logTag = "ProcessingInvalidTxCounter" + FinalizeBatchNumber logTag = "FinalizeBatchNumber" + BatchCommitDBTiming logTag = "BatchCommitDBTiming" + PbStateTiming logTag = "PbStateTiming" + ZkIncIntermediateHashesTiming logTag = "ZkIncIntermediateHashesTiming" + FinaliseBlockWriteTiming logTag = "FinaliseBlockWriteTiming" +) + +type Statistics interface { + CumulativeCounting(tag logTag) + CumulativeValue(tag logTag, value int64) + CumulativeTiming(tag logTag, duration time.Duration) + SetTag(tag logTag, value string) + GetTag(tag logTag) string + GetStatistics(tag logTag) int64 + Summary() string +} diff --git a/zk/metrics/statisticsimpl_xlayer.go b/zk/metrics/statisticsimpl_xlayer.go new file mode 100644 index 00000000000..7304abe98ed --- /dev/null +++ b/zk/metrics/statisticsimpl_xlayer.go @@ -0,0 +1,85 @@ +package metrics + +import ( + "strconv" + "sync" + "time" + + "github.com/ledgerwatch/log/v3" +) + +var instance *statisticsInstance +var once sync.Once + +func GetLogStatistics() Statistics { + once.Do(func() { + instance = &statisticsInstance{} + instance.resetStatistics() + }) + return instance +} + +type statisticsInstance struct { + newRoundTime time.Time + statistics map[logTag]int64 // value maybe the counter or time.Duration(ms) + tags map[logTag]string +} + +func (l *statisticsInstance) CumulativeCounting(tag logTag) { + l.statistics[tag]++ +} + +func (l *statisticsInstance) CumulativeValue(tag logTag, value int64) { + l.statistics[tag] += value +} + +func (l *statisticsInstance) CumulativeTiming(tag logTag, duration time.Duration) { + l.statistics[tag] += duration.Milliseconds() +} + +func (l *statisticsInstance) SetTag(tag logTag, value string) { + l.tags[tag] = value +} + +func (l *statisticsInstance) resetStatistics() { + l.newRoundTime = time.Now() + l.statistics = make(map[logTag]int64) + l.tags = make(map[logTag]string) +} + +func (l *statisticsInstance) Summary() string { + batch := "Batch<" + l.tags[FinalizeBatchNumber] + ">, " + totalDuration := "TotalDuration<" + strconv.Itoa(int(time.Since(l.newRoundTime).Milliseconds())) + "ms>, " + gasUsed := "GasUsed<" + strconv.Itoa(int(l.statistics[BatchGas])) + ">, " + blockCount := "Block<" + strconv.Itoa(int(l.statistics[BlockCounter])) + ">, " + tx := "Tx<" + strconv.Itoa(int(l.statistics[TxCounter])) + ">, " + getTx := "GetTx<" + strconv.Itoa(int(l.statistics[GetTx])) + "ms>, " + getTxPause := "GetTxPause<" + strconv.Itoa(int(l.statistics[GetTxPauseCounter])) + ">, " + getTxPauseTiming := "GetTxPauseTiming<" + strconv.Itoa(int(l.statistics[GetTxPauseTiming])) + "ms>, " + reprocessTx := "ReprocessTx<" + strconv.Itoa(int(l.statistics[ReprocessingTxCounter])) + ">, " + resourceOverTx := "ResourceOverTx<" + strconv.Itoa(int(l.statistics[FailTxResourceOverCounter])) + ">, " + failTx := "FailTx<" + strconv.Itoa(int(l.statistics[FailTxCounter])) + ">, " + invalidTx := "InvalidTx<" + strconv.Itoa(int(l.statistics[ProcessingInvalidTxCounter])) + ">, " + processTxTiming := "ProcessTx<" + strconv.Itoa(int(l.statistics[ProcessingTxTiming])) + "ms>, " + batchCommitDBTiming := "BatchCommitDBTiming<" + strconv.Itoa(int(l.statistics[BatchCommitDBTiming])) + "ms>, " + pbStateTiming := "PbStateTiming<" + strconv.Itoa(int(l.statistics[PbStateTiming])) + "ms>, " + zkIncIntermediateHashesTiming := "ZkIncIntermediateHashesTiming<" + strconv.Itoa(int(l.statistics[ZkIncIntermediateHashesTiming])) + "ms>, " + finaliseBlockWriteTiming := "FinaliseBlockWriteTiming<" + strconv.Itoa(int(l.statistics[FinaliseBlockWriteTiming])) + "ms>, " + batchCloseReason := "BatchCloseReason<" + l.tags[BatchCloseReason] + ">" + + result := batch + totalDuration + gasUsed + blockCount + tx + getTx + getTxPause + getTxPauseTiming + + reprocessTx + resourceOverTx + failTx + invalidTx + processTxTiming + pbStateTiming + + zkIncIntermediateHashesTiming + finaliseBlockWriteTiming + batchCommitDBTiming + + batchCloseReason + log.Info(result) + l.resetStatistics() + return result +} + +func (l *statisticsInstance) GetTag(tag logTag) string { + return l.tags[tag] +} + +func (l *statisticsInstance) GetStatistics(tag logTag) int64 { + return l.statistics[tag] +} diff --git a/zk/metrics/statisticsimpl_xlayer_test.go b/zk/metrics/statisticsimpl_xlayer_test.go new file mode 100644 index 00000000000..b145e8c7ac1 --- /dev/null +++ b/zk/metrics/statisticsimpl_xlayer_test.go @@ -0,0 +1,50 @@ +package metrics + +import ( + "testing" + "time" +) + +func TestStatisticsInstanceSummary(t *testing.T) { + type fields struct { + timestamp time.Time + statistics map[logTag]int64 + tags map[logTag]string + } + tests := []struct { + name string + fields fields + want string + }{ + {"1", fields{ + timestamp: time.Now().Add(-time.Second), + statistics: map[logTag]int64{ + BatchGas: 111111, + TxCounter: 10, + GetTx: time.Second.Milliseconds(), + GetTxPauseCounter: 2, + GetTxPauseTiming: time.Second.Milliseconds() * 30, + ReprocessingTxCounter: 3, + FailTxResourceOverCounter: 1, + FailTxCounter: 1, + ProcessingInvalidTxCounter: 2, + ProcessingTxTiming: time.Second.Milliseconds() * 30, + BatchCommitDBTiming: time.Second.Milliseconds() * 10, + PbStateTiming: time.Second.Milliseconds() * 20, + ZkIncIntermediateHashesTiming: time.Second.Milliseconds() * 15, + FinaliseBlockWriteTiming: time.Second.Milliseconds() * 25, + }, + tags: map[logTag]string{BatchCloseReason: "deadline", FinalizeBatchNumber: "123"}, + }, "test"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &statisticsInstance{ + newRoundTime: tt.fields.timestamp, + statistics: tt.fields.statistics, + tags: tt.fields.tags, + } + t.Log(l.Summary()) + }) + } +} diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 9b3e0d79319..c9013bbacaa 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -3,6 +3,7 @@ package stages import ( "context" "fmt" + "strconv" "time" "github.com/gateway-fm/cdk-erigon-lib/common" @@ -15,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" + "github.com/ledgerwatch/erigon/zk/metrics" "github.com/ledgerwatch/erigon/zk/utils" ) @@ -30,6 +32,11 @@ func SpawnSequencingStage( log.Info(fmt.Sprintf("[%s] Starting sequencing stage", logPrefix)) defer log.Info(fmt.Sprintf("[%s] Finished sequencing stage", logPrefix)) + // For X Layer metrics + log.Info("[PoolTxCount] Starting Getting Pending Tx Count") + pending, basefee, queued := cfg.txPool.CountContent() + metrics.AddPoolTxCount(pending, basefee, queued) + sdb, err := newStageDb(ctx, cfg.db) if err != nil { return err @@ -140,7 +147,14 @@ func SpawnSequencingStage( log.Info(fmt.Sprintf("[%s] Starting batch %d...", logPrefix, batchState.batchNumber)) + // For X Layer + var batchCloseReason metrics.BatchFinalizeType + batchStart := time.Now() + for blockNumber := executionAt + 1; runLoopBlocks; blockNumber++ { + // For X Layer + metrics.GetLogStatistics().CumulativeCounting(metrics.BlockCounter) + log.Info(fmt.Sprintf("[%s] Starting block %d (forkid %v)...", logPrefix, blockNumber, batchState.forkId)) logTicker.Reset(10 * time.Second) blockTicker.Reset(cfg.zk.SequencerBlockSealTime) @@ -180,6 +194,8 @@ func SpawnSequencingStage( return err } if !batchState.isAnyRecovery() && overflowOnNewBlock { + // For X Layer + batchCloseReason = metrics.BatchCounterOverflow break } @@ -211,6 +227,8 @@ func SpawnSequencingStage( } case <-batchTicker.C: if !batchState.isAnyRecovery() { + // For X Layer + batchCloseReason = metrics.BatchTimeOut runLoopBlocks = false break LOOP_TRANSACTIONS } @@ -222,30 +240,42 @@ func SpawnSequencingStage( } } else if !batchState.isL1Recovery() { var allConditionsOK bool + // For X Layer + start := time.Now() batchState.blockState.transactionsForInclusion, allConditionsOK, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions) if err != nil { return err } + metrics.GetLogStatistics().CumulativeTiming(metrics.GetTx, time.Since(start)) if len(batchState.blockState.transactionsForInclusion) == 0 { if allConditionsOK { + metrics.GetLogStatistics().CumulativeTiming(metrics.GetTxPauseTiming, batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool) time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool) } else { + metrics.GetLogStatistics().CumulativeTiming(metrics.GetTxPauseTiming, batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool/5) time.Sleep(batchContext.cfg.zk.SequencerTimeoutOnEmptyTxPool / 5) // we do not need to sleep too long for txpool not ready } + metrics.GetLogStatistics().CumulativeCounting(metrics.GetTxPauseCounter) } else { log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) } } + start := time.Now() for i, transaction := range batchState.blockState.transactionsForInclusion { + // For X Layer + metrics.GetLogStatistics().CumulativeCounting(metrics.TxCounter) + txHash := transaction.Hash() effectiveGas := batchState.blockState.getL1EffectiveGases(cfg, i) // The copying of this structure is intentional backupDataSizeChecker := *blockDataSizeChecker + receipt, execResult, anyOverflow, err := attemptAddTransaction(cfg, sdb, ibs, batchCounters, &blockContext, header, transaction, effectiveGas, batchState.isL1Recovery(), batchState.forkId, l1TreeUpdateIndex, &backupDataSizeChecker) if err != nil { + metrics.GetLogStatistics().CumulativeCounting(metrics.FailTxCounter) if batchState.isLimboRecovery() { panic("limbo transaction has already been executed once so they must not fail while re-executing") } @@ -266,6 +296,7 @@ func SpawnSequencingStage( } if anyOverflow { + metrics.GetLogStatistics().CumulativeCounting(metrics.FailTxResourceOverCounter) if batchState.isLimboRecovery() { panic("limbo transaction has already been executed once so they must not overflow counters while re-executing") } @@ -287,18 +318,21 @@ func SpawnSequencingStage( log.Trace(fmt.Sprintf("single transaction %s overflow counters", txHash)) } + // For X Layer + batchCloseReason = metrics.BatchCounterOverflow runLoopBlocks = false break LOOP_TRANSACTIONS } } - if err == nil { + // For X Layer + metrics.GetLogStatistics().CumulativeValue(metrics.BatchGas, int64(execResult.UsedGas)) blockDataSizeChecker = &backupDataSizeChecker batchState.onAddedTransaction(transaction, receipt, execResult, effectiveGas) } } - + metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxTiming, time.Since(start)) if batchState.isL1Recovery() { // just go into the normal loop waiting for new transactions to signal that the recovery // has finished as far as it can go @@ -308,8 +342,8 @@ func SpawnSequencingStage( break LOOP_TRANSACTIONS } - if batchState.isLimboRecovery() { + batchCloseReason = metrics.BatchLimboRecovery runLoopBlocks = false break LOOP_TRANSACTIONS } @@ -396,8 +430,13 @@ func SpawnSequencingStage( } // For X Layer + metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, string(batchCloseReason)) + batchTime := time.Since(batchStart) + metrics.BatchExecuteTime(string(batchCloseReason), batchTime) + metrics.GetLogStatistics().SetTag(metrics.FinalizeBatchNumber, strconv.Itoa(int(batchState.batchNumber))) + metrics.GetLogStatistics().Summary() tryToSleepSequencer(cfg.zk.XLayer.SequencerBatchSleepDuration, logPrefix) - + // TODO: It is 99% sure that there is no need to write this in any of processInjectedInitialBatch, alignExecutionToDatastream, doCheckForBadBatch but it is worth double checknig // the unwind of this value is handed by UnwindExecutionStageDbWrites if _, err := rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, block.NumberU64()); err != nil { @@ -406,5 +445,9 @@ func SpawnSequencingStage( log.Info(fmt.Sprintf("[%s] Finish batch %d...", batchContext.s.LogPrefix(), batchState.batchNumber)) - return sdb.tx.Commit() + // For X Layer + start := time.Now() + err = sdb.tx.Commit() + metrics.GetLogStatistics().CumulativeTiming(metrics.BatchCommitDBTiming, time.Since(start)) + return err } diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 060c753a26d..68ba4841428 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -2,11 +2,12 @@ package stages import ( "fmt" + "math/big" + "time" "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/gateway-fm/cdk-erigon-lib/kv" - - "math/big" + "github.com/ledgerwatch/secp256k1" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/rawdb" @@ -17,9 +18,9 @@ import ( "github.com/ledgerwatch/erigon/smt/pkg/blockinfo" "github.com/ledgerwatch/erigon/zk/erigon_db" "github.com/ledgerwatch/erigon/zk/hermez_db" + "github.com/ledgerwatch/erigon/zk/metrics" zktypes "github.com/ledgerwatch/erigon/zk/types" "github.com/ledgerwatch/erigon/zk/utils" - "github.com/ledgerwatch/secp256k1" ) func handleStateForNewBlockStarting( @@ -159,9 +160,13 @@ func finaliseBlock( }) } + // For X Layer + pbStateStart := time.Now() if err := postBlockStateHandling(*batchContext.cfg, ibs, batchContext.sdb.hermezDb, newHeader, ger, l1BlockHash, parentBlock.Root(), txInfos); err != nil { return nil, err } + // For X Layer + metrics.GetLogStatistics().CumulativeTiming(metrics.PbStateTiming, time.Since(pbStateStart)) if batchState.isL1Recovery() { for i, receipt := range builtBlockElements.receipts { @@ -188,12 +193,18 @@ func finaliseBlock( return nil, err } + // For X Layer + zkIncStart := time.Now() // this is actually the interhashes stage newRoot, err := zkIncrementIntermediateHashes(batchContext.ctx, batchContext.s.LogPrefix(), batchContext.s, batchContext.sdb.tx, batchContext.sdb.eridb, batchContext.sdb.smt, newHeader.Number.Uint64()-1, newHeader.Number.Uint64()) if err != nil { return nil, err } + // For X Layer + metrics.GetLogStatistics().CumulativeTiming(metrics.ZkIncIntermediateHashesTiming, time.Since(zkIncStart)) + + doFinStart := time.Now() finalHeader := finalBlock.HeaderNoCopy() finalHeader.Root = newRoot finalHeader.Coinbase = batchContext.cfg.zk.AddressSequencer @@ -241,6 +252,9 @@ func finaliseBlock( return nil, fmt.Errorf("write block batch error: %v", err) } + // For X Layer + metrics.GetLogStatistics().CumulativeTiming(metrics.FinaliseBlockWriteTiming, time.Since(doFinStart)) + // write batch counters err = batchContext.sdb.hermezDb.WriteBatchCounters(newNum.Uint64(), batchCounters.CombineCollectorsNoChanges().UsedAsMap()) if err != nil {