From b9a212bc469aa6ae0a4734d8ce73855852bcc067 Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 31 Jul 2024 10:03:29 +0800 Subject: [PATCH 1/3] keep consistent status when meet an unexpected el sync --- op-node/rollup/derive/engine_controller.go | 21 ++++++++++++++++----- op-service/sources/engine_client.go | 4 ++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index 555728a5ea..d463ddab3b 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -4,17 +4,19 @@ import ( "context" "errors" "fmt" + "strings" "time" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/async" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" ) type syncStatusEnum int @@ -279,6 +281,10 @@ func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus } // Allow SYNCING and ACCEPTED if engine EL sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted + } else if e.syncMode == sync.CLSync { + if status == eth.ExecutionInconsistent { + return true + } } return status == eth.ExecutionValid } @@ -296,6 +302,11 @@ func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloa return status == eth.ExecutionValid } +// checkELSyncTriggered checks returned err of engine_newPayloadV1 +func (e *EngineController) checkELSyncTriggered(status eth.ExecutePayloadStatus, err error) bool { + return e.syncMode != sync.ELSync && status == eth.ExecutionSyncing && strings.Contains(err.Error(), "forced head needed for startup") +} + // checkUpdateUnsafeHead checks if we can update current unsafeHead for op-node func (e *EngineController) checkUpdateUnsafeHead(status eth.ExecutePayloadStatus) bool { if e.syncMode == sync.ELSync { @@ -360,12 +371,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et } // Insert the payload & then call FCU status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot) - if err != nil { + if err != nil && !e.checkELSyncTriggered(status.Status, err) { return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) } //process inconsistent state - if status.Status == eth.ExecutionInconsistent { + if status.Status == eth.ExecutionInconsistent || e.checkELSyncTriggered(status.Status, err) { currentL2Info, err := e.getCurrentL2Info(ctx) if err != nil { return NewTemporaryError(fmt.Errorf("failed to process inconsistent state: %w", err)) diff --git a/op-service/sources/engine_client.go b/op-service/sources/engine_client.go index 183356f03c..96ef54ee5e 100644 --- a/op-service/sources/engine_client.go +++ b/op-service/sources/engine_client.go @@ -3,6 +3,7 @@ package sources import ( "context" "fmt" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -136,6 +137,9 @@ func (s *EngineAPIClient) NewPayload(ctx context.Context, payload *eth.Execution e.Trace("Received payload execution result", "status", result.Status, "latestValidHash", result.LatestValidHash, "message", result.ValidationError) if err != nil { + if strings.Contains(err.Error(), "forced head needed for startup") { + return &result, err + } e.Error("Payload execution failed", "err", err) return nil, fmt.Errorf("failed to execute payload: %w", err) } From 87b9c9001de7ff052b06f3443582bb2345265b8c Mon Sep 17 00:00:00 2001 From: Krish Date: Fri, 2 Aug 2024 18:24:56 +0800 Subject: [PATCH 2/3] fix: add retry logic when data delete meet an overtime err --- op-node/rollup/derive/engine_controller.go | 56 +++++++++++++++++----- op-service/sources/engine_client.go | 3 +- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index d463ddab3b..cb0f1e7c14 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -34,7 +34,14 @@ const ( syncStatusFinishedEL // EL sync is done & we should be performing consolidation ) -var errNoFCUNeeded = errors.New("no FCU call was needed") +var ( + errNoFCUNeeded = errors.New("no FCU call was needed") + ErrELSyncTriggerUnexpected = errors.New("forced head needed for startup") + + maxFCURetryAttempts = 5 + fcuRetryDelay = 5 * time.Second + needSyncWithEngine = false +) var _ EngineControl = (*EngineController)(nil) var _ LocalEngineControl = (*EngineController)(nil) @@ -304,7 +311,12 @@ func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloa // checkELSyncTriggered checks returned err of engine_newPayloadV1 func (e *EngineController) checkELSyncTriggered(status eth.ExecutePayloadStatus, err error) bool { - return e.syncMode != sync.ELSync && status == eth.ExecutionSyncing && strings.Contains(err.Error(), "forced head needed for startup") + if err == nil { + return false + } else if strings.Contains(err.Error(), ErrELSyncTriggerUnexpected.Error()) { + return e.syncMode != sync.ELSync && status == eth.ExecutionSyncing + } + return false } // checkUpdateUnsafeHead checks if we can update current unsafeHead for op-node @@ -371,8 +383,12 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et } // Insert the payload & then call FCU status, err := e.engine.NewPayload(ctx, envelope.ExecutionPayload, envelope.ParentBeaconBlockRoot) - if err != nil && !e.checkELSyncTriggered(status.Status, err) { - return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) + if err != nil { + if strings.Contains(err.Error(), ErrELSyncTriggerUnexpected.Error()) { + log.Info("el sync triggered as unexpected") + } else { + return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) + } } //process inconsistent state @@ -399,13 +415,25 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et FinalizedBlockHash: e.finalizedHead.Hash, } - fcuRes, err := e.engine.ForkchoiceUpdate(ctx, &fcuReq, nil) - if fcuRes.PayloadStatus.Status == eth.ExecutionValid { - log.Info("engine processed data successfully") - e.needFCUCall = false - return nil - } else { - return NewTemporaryError(fmt.Errorf("engine failed to process inconsistent data: %w", err)) + for attempts := 0; attempts < maxFCURetryAttempts; attempts++ { + fcuRes, err := e.engine.ForkchoiceUpdate(ctx, &fcuReq, nil) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + log.Warn("Failed to share forkchoice-updated signal, attempt %d: %v", attempts+1, err) + time.Sleep(fcuRetryDelay) + continue + } + return NewTemporaryError(fmt.Errorf("engine failed to process due to error: %w", err)) + } + + if fcuRes.PayloadStatus.Status == eth.ExecutionValid { + log.Info("engine processed data successfully") + e.needFCUCall = false + needSyncWithEngine = true + break + } else { + return NewTemporaryError(fmt.Errorf("engine failed to process inconsistent data")) + } } } @@ -423,8 +451,8 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et } //update unsafe,safe,finalize and send fcu for sync - if status.Status == eth.ExecutionInconsistent { - log.Info("engine meet inconsistent here") + if needSyncWithEngine { + log.Info("engine meet inconsistent, sync status") currentUnsafe, _ := e.engine.L2BlockRefByLabel(ctx, eth.Unsafe) //reset unsafe e.SetUnsafeHead(currentUnsafe) @@ -435,6 +463,8 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et fc.HeadBlockHash = currentUnsafe.Hash fc.SafeBlockHash = currentUnsafe.Hash fc.FinalizedBlockHash = currentUnsafe.Hash + + needSyncWithEngine = false } if e.syncStatus == syncStatusFinishedELButNotFinalized { diff --git a/op-service/sources/engine_client.go b/op-service/sources/engine_client.go index 96ef54ee5e..1bac00d8fd 100644 --- a/op-service/sources/engine_client.go +++ b/op-service/sources/engine_client.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/sources/caching" @@ -137,7 +138,7 @@ func (s *EngineAPIClient) NewPayload(ctx context.Context, payload *eth.Execution e.Trace("Received payload execution result", "status", result.Status, "latestValidHash", result.LatestValidHash, "message", result.ValidationError) if err != nil { - if strings.Contains(err.Error(), "forced head needed for startup") { + if strings.Contains(err.Error(), derive.ErrELSyncTriggerUnexpected.Error()) { return &result, err } e.Error("Payload execution failed", "err", err) From 5ed611275d88c2b14ee7b449b118312f450c37fc Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 6 Aug 2024 17:34:08 +0800 Subject: [PATCH 3/3] chore: refine comments --- op-node/rollup/derive/engine_controller.go | 107 ++++++++++++++------- 1 file changed, 72 insertions(+), 35 deletions(-) diff --git a/op-node/rollup/derive/engine_controller.go b/op-node/rollup/derive/engine_controller.go index cb0f1e7c14..b176fb7739 100644 --- a/op-node/rollup/derive/engine_controller.go +++ b/op-node/rollup/derive/engine_controller.go @@ -391,22 +391,18 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et } } + var ( + needResetSafeHead bool + needResetFinalizedHead bool + ) + //process inconsistent state if status.Status == eth.ExecutionInconsistent || e.checkELSyncTriggered(status.Status, err) { currentL2Info, err := e.getCurrentL2Info(ctx) if err != nil { return NewTemporaryError(fmt.Errorf("failed to process inconsistent state: %w", err)) } else { - log.Info("engine has inconsistent state", "unsafe", currentL2Info.Unsafe.Number, "safe", currentL2Info.Safe.Number, "final", currentL2Info.Finalized.Number) - e.SetUnsafeHead(currentL2Info.Unsafe) - if currentL2Info.Safe.Number > currentL2Info.Unsafe.Number { - log.Info("current safe is higher than unsafe block, reset it", "set safe after", currentL2Info.Unsafe.Number, "set safe before", e.safeHead.Number) - e.SetSafeHead(currentL2Info.Unsafe) - } - if currentL2Info.Finalized.Number > currentL2Info.Unsafe.Number { - log.Info("current finalized is higher than unsafe block, reset it", "set Finalized after", currentL2Info.Unsafe.Number, "set Finalized before", e.safeHead.Number) - e.SetFinalizedHead(currentL2Info.Unsafe) - } + needResetSafeHead, needResetFinalizedHead = e.resetSafeAndFinalizedHead(currentL2Info) } fcuReq := eth.ForkchoiceState{ @@ -415,25 +411,9 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et FinalizedBlockHash: e.finalizedHead.Hash, } - for attempts := 0; attempts < maxFCURetryAttempts; attempts++ { - fcuRes, err := e.engine.ForkchoiceUpdate(ctx, &fcuReq, nil) - if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - log.Warn("Failed to share forkchoice-updated signal, attempt %d: %v", attempts+1, err) - time.Sleep(fcuRetryDelay) - continue - } - return NewTemporaryError(fmt.Errorf("engine failed to process due to error: %w", err)) - } - - if fcuRes.PayloadStatus.Status == eth.ExecutionValid { - log.Info("engine processed data successfully") - e.needFCUCall = false - needSyncWithEngine = true - break - } else { - return NewTemporaryError(fmt.Errorf("engine failed to process inconsistent data")) - } + needSyncWithEngine, err = e.trySyncingWithEngine(ctx, fcuReq) + if err != nil { + return NewTemporaryError(err) } } @@ -456,16 +436,25 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et currentUnsafe, _ := e.engine.L2BlockRefByLabel(ctx, eth.Unsafe) //reset unsafe e.SetUnsafeHead(currentUnsafe) - //force reset safe,finalize - e.SetSafeHead(currentUnsafe) - e.SetFinalizedHead(currentUnsafe) - fc.HeadBlockHash = currentUnsafe.Hash - fc.SafeBlockHash = currentUnsafe.Hash - fc.FinalizedBlockHash = currentUnsafe.Hash + + //force reset safe,finalize if needed + if needResetFinalizedHead { + e.SetFinalizedHead(currentUnsafe) + fc.FinalizedBlockHash = currentUnsafe.Hash + needResetFinalizedHead = false + } + if needResetSafeHead { + e.SetSafeHead(currentUnsafe) + fc.SafeBlockHash = currentUnsafe.Hash + needResetSafeHead = false + } needSyncWithEngine = false } + // Ensure that the variables are used even if needSyncWithEngine is false + _ = needResetSafeHead + _ = needResetFinalizedHead if e.syncStatus == syncStatusFinishedELButNotFinalized { fc.SafeBlockHash = envelope.ExecutionPayload.BlockHash @@ -494,6 +483,7 @@ func (e *EngineController) InsertUnsafePayload(ctx context.Context, envelope *et } e.needFCUCall = false + // unsafe will update to the latest broadcast block anyway, this will trigger an el sync in geth when meet an inconsistent state and accelerate recover progress. if e.checkUpdateUnsafeHead(fcRes.PayloadStatus.Status) { e.SetUnsafeHead(ref) } @@ -619,3 +609,50 @@ func (e *EngineController) getCurrentL2Info(ctx context.Context) (*sync.FindHead Finalized: finalized, }, nil } + +// resetSafeAndFinalizedHead will reset current safe/finalized head to keep consistent with unsafe head from engine, reset safe/finalized head if current unsafe is behind them +func (e *EngineController) resetSafeAndFinalizedHead(currentL2Info *sync.FindHeadsResult) (bool, bool) { + var needResetSafeHead, needResetFinalizedHead bool + + log.Info("engine has inconsistent state", "unsafe", currentL2Info.Unsafe.Number, "safe", currentL2Info.Safe.Number, "final", currentL2Info.Finalized.Number) + e.SetUnsafeHead(currentL2Info.Unsafe) + + if currentL2Info.Safe.Number > currentL2Info.Unsafe.Number { + log.Info("current safe is higher than unsafe block, reset it", "set safe after", currentL2Info.Unsafe.Number, "set safe before", e.safeHead.Number) + e.SetSafeHead(currentL2Info.Unsafe) + needResetSafeHead = true + } + + if currentL2Info.Finalized.Number > currentL2Info.Unsafe.Number { + log.Info("current finalized is higher than unsafe block, reset it", "set Finalized after", currentL2Info.Unsafe.Number, "set Finalized before", e.safeHead.Number) + e.SetFinalizedHead(currentL2Info.Unsafe) + needResetFinalizedHead = true + } + + return needResetSafeHead, needResetFinalizedHead +} + +// trySyncingWithEngine will request engine to deleting data beyond diskroot to keep synced with current node status +func (e *EngineController) trySyncingWithEngine(ctx context.Context, fcuReq eth.ForkchoiceState) (bool, error) { + for attempts := 0; attempts < maxFCURetryAttempts; attempts++ { + fcuRes, err := e.engine.ForkchoiceUpdate(ctx, &fcuReq, nil) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + log.Warn("Failed to share forkchoice-updated signal", "attempt:", attempts+1, "err", err) + time.Sleep(fcuRetryDelay) + continue + } + return false, fmt.Errorf("engine failed to process due to error: %w", err) + } + + if fcuRes.PayloadStatus.Status == eth.ExecutionValid { + log.Info("engine processed data successfully") + e.needFCUCall = false + return true, nil + } else { + return false, fmt.Errorf("engine failed to process inconsistent data") + } + } + + return false, fmt.Errorf("max retry attempts reached for trySyncingWithEngine") +}