Skip to content

Commit

Permalink
Merge pull request #78 from owen-reorg/ha-0.2.0
Browse files Browse the repository at this point in the history
feat: support persist active api
  • Loading branch information
owen-reorg committed Oct 30, 2023
2 parents 2fcb540 + 312d42c commit 2d0895f
Show file tree
Hide file tree
Showing 15 changed files with 557 additions and 44 deletions.
4 changes: 4 additions & 0 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (s *l2VerifierBackend) StopSequencer(ctx context.Context) (common.Hash, err
return common.Hash{}, errors.New("stopping the L2Verifier sequencer is not supported")
}

func (s *l2VerifierBackend) SequencerActive(ctx context.Context) (bool, error) {
return false, nil
}

func (s *L2Verifier) L2Finalized() eth.L2BlockRef {
return s.derivation.Finalized()
}
Expand Down
5 changes: 5 additions & 0 deletions op-e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
EnableAdmin: true,
},
L1EpochPollInterval: time.Second * 4,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
},
"verifier": {
Driver: driver.Config{
Expand All @@ -178,6 +179,7 @@ func DefaultSystemConfig(t *testing.T) SystemConfig {
SequencerEnabled: false,
},
L1EpochPollInterval: time.Second * 4,
ConfigPersistence: &rollupNode.DisabledConfigPersistence{},
},
},
Loggers: map[string]log.Logger{
Expand Down Expand Up @@ -546,6 +548,9 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
nodeConfig := cfg.Nodes[name]
c := *nodeConfig // copy
c.Rollup = makeRollupConfig()
if err := c.LoadPersisted(cfg.Loggers[name]); err != nil {
return nil, err
}

if p, ok := p2pNodes[name]; ok {
c.P2P = p
Expand Down
177 changes: 177 additions & 0 deletions op-e2e/system_adminrpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package op_e2e

import (
"context"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
)

func TestStopStartSequencer(t *testing.T) {
InitParallel(t)

cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()

l2Seq := sys.Clients["sequencer"]
rollupNode := sys.RollupNodes["sequencer"]

nodeRPC, err := rpc.DialContext(context.Background(), rollupNode.HTTPEndpoint())
require.Nil(t, err, "Error dialing node")
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(nodeRPC))

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
active, err := rollupClient.SequencerActive(ctx)
require.NoError(t, err)
require.True(t, active, "sequencer should be active")

blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter := latestBlock(t, l2Seq)
require.Greaterf(t, blockAfter, blockBefore, "Chain did not advance")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
blockHash, err := rollupClient.StopSequencer(ctx)
require.Nil(t, err, "Error stopping sequencer")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
active, err = rollupClient.SequencerActive(ctx)
require.NoError(t, err)
require.False(t, active, "sequencer should be inactive")

blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Equal(t, blockAfter, blockBefore, "Chain advanced after stopping sequencer")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = rollupClient.StartSequencer(ctx, blockHash)
require.Nil(t, err, "Error starting sequencer")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
active, err = rollupClient.SequencerActive(ctx)
require.NoError(t, err)
require.True(t, active, "sequencer should be active again")

blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Greater(t, blockAfter, blockBefore, "Chain did not advance after starting sequencer")
}

func TestPersistSequencerStateWhenChanged(t *testing.T) {
InitParallel(t)
ctx := context.Background()
dir := t.TempDir()
stateFile := dir + "/state.json"

cfg := DefaultSystemConfig(t)
// We don't need a verifier - just the sequencer is enough
delete(cfg.Nodes, "verifier")
cfg.Nodes["sequencer"].ConfigPersistence = node.NewConfigPersistence(stateFile)

sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()

assertPersistedSequencerState(t, stateFile, node.StateStarted)

rollupRPCClient, err := rpc.DialContext(ctx, sys.RollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))

err = rollupClient.StartSequencer(ctx, common.Hash{0xaa})
require.ErrorContains(t, err, "sequencer already running")

head, err := rollupClient.StopSequencer(ctx)
require.NoError(t, err)
require.NotEqual(t, common.Hash{}, head)
assertPersistedSequencerState(t, stateFile, node.StateStopped)
}

func TestLoadSequencerStateOnStarted_Stopped(t *testing.T) {
InitParallel(t)
ctx := context.Background()
dir := t.TempDir()
stateFile := dir + "/state.json"

// Prepare the persisted state file with sequencer stopped
configReader := node.NewConfigPersistence(stateFile)
require.NoError(t, configReader.SequencerStopped())

cfg := DefaultSystemConfig(t)
// We don't need a verifier - just the sequencer is enough
delete(cfg.Nodes, "verifier")
seqCfg := cfg.Nodes["sequencer"]
seqCfg.ConfigPersistence = node.NewConfigPersistence(stateFile)

sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()

rollupRPCClient, err := rpc.DialContext(ctx, sys.RollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))

// Still persisted as stopped after startup
assertPersistedSequencerState(t, stateFile, node.StateStopped)

// Sequencer is really stopped
_, err = rollupClient.StopSequencer(ctx)
require.ErrorContains(t, err, "sequencer not running")
assertPersistedSequencerState(t, stateFile, node.StateStopped)
}

func TestLoadSequencerStateOnStarted_Started(t *testing.T) {
InitParallel(t)
ctx := context.Background()
dir := t.TempDir()
stateFile := dir + "/state.json"

// Prepare the persisted state file with sequencer stopped
configReader := node.NewConfigPersistence(stateFile)
require.NoError(t, configReader.SequencerStarted())

cfg := DefaultSystemConfig(t)
// We don't need a verifier - just the sequencer is enough
delete(cfg.Nodes, "verifier")
seqCfg := cfg.Nodes["sequencer"]
seqCfg.Driver.SequencerStopped = true
seqCfg.ConfigPersistence = node.NewConfigPersistence(stateFile)

sys, err := cfg.Start()
require.NoError(t, err)
defer sys.Close()

rollupRPCClient, err := rpc.DialContext(ctx, sys.RollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err)
rollupClient := sources.NewRollupClient(client.NewBaseRPCClient(rollupRPCClient))

// Still persisted as stopped after startup
assertPersistedSequencerState(t, stateFile, node.StateStarted)

// Sequencer is really stopped
err = rollupClient.StartSequencer(ctx, common.Hash{})
require.ErrorContains(t, err, "sequencer already running")
assertPersistedSequencerState(t, stateFile, node.StateStarted)
}

func assertPersistedSequencerState(t *testing.T, stateFile string, expected node.RunningState) {
configReader := node.NewConfigPersistence(stateFile)
state, err := configReader.SequencerState()
require.NoError(t, err)
require.Equalf(t, expected, state, "expected sequencer state %v but was %v", expected, state)
}
41 changes: 0 additions & 41 deletions op-e2e/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,47 +1251,6 @@ func TestFees(t *testing.T) {
require.Equal(t, balanceDiff, totalFee, "balances should add up")
}

func TestStopStartSequencer(t *testing.T) {
InitParallel(t)

cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()

l2Seq := sys.Clients["sequencer"]
rollupNode := sys.RollupNodes["sequencer"]

nodeRPC, err := rpc.DialContext(context.Background(), rollupNode.HTTPEndpoint())
require.Nil(t, err, "Error dialing node")

blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter := latestBlock(t, l2Seq)
require.Greaterf(t, blockAfter, blockBefore, "Chain did not advance")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
blockHash := common.Hash{}
err = nodeRPC.CallContext(ctx, &blockHash, "admin_stopSequencer")
require.Nil(t, err, "Error stopping sequencer")

blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Equal(t, blockAfter, blockBefore, "Chain advanced after stopping sequencer")

ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = nodeRPC.CallContext(ctx, nil, "admin_startSequencer", blockHash)
require.Nil(t, err, "Error starting sequencer")

blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Greater(t, blockAfter, blockBefore, "Chain did not advance after starting sequencer")
}

func TestStopStartBatcher(t *testing.T) {
InitParallel(t)

Expand Down
6 changes: 6 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ var (
Usage: "Enable the admin API (experimental)",
EnvVar: prefixEnvVar("RPC_ENABLE_ADMIN"),
}
RPCAdminPersistence = cli.StringFlag{
Name: "rpc.admin-state",
Usage: "File path used to persist state changes made via the admin API so they persist across restarts. Disabled if not set.",
EnvVar: prefixEnvVar("RPC_ADMIN_STATE"),
}

/* Optional Flags */
L1TrustRPC = cli.BoolFlag{
Expand Down Expand Up @@ -253,6 +258,7 @@ var optionalFlags = []cli.Flag{
SequencerL1Confs,
L1EpochPollIntervalFlag,
RPCEnableAdmin,
RPCAdminPersistence,
MetricsEnabledFlag,
MetricsAddrFlag,
MetricsPortFlag,
Expand Down
7 changes: 7 additions & 0 deletions op-node/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type driverClient interface {
ResetDerivationPipeline(context.Context) error
StartSequencer(ctx context.Context, blockHash common.Hash) error
StopSequencer(context.Context) (common.Hash, error)
SequencerActive(context.Context) (bool, error)
}

type rpcMetrics interface {
Expand Down Expand Up @@ -65,6 +66,12 @@ func (n *adminAPI) StopSequencer(ctx context.Context) (common.Hash, error) {
return n.dr.StopSequencer(ctx)
}

func (n *adminAPI) SequencerActive(ctx context.Context) (bool, error) {
recordDur := n.m.RecordRPCServerRequest("admin_sequencerActive")
defer recordDur()
return n.dr.SequencerActive(ctx)
}

type nodeAPI struct {
config *rollup.Config
client l2EthClient
Expand Down
22 changes: 22 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"math"
"time"

"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log"
)

type Config struct {
Expand All @@ -37,6 +39,8 @@ type Config struct {
// Used to poll the L1 for new finalized or safe blocks
L1EpochPollInterval time.Duration

ConfigPersistence ConfigPersistence

// Optional
Tracer Tracer
Heartbeat HeartbeatConfig
Expand Down Expand Up @@ -78,6 +82,24 @@ type HeartbeatConfig struct {
URL string
}

func (cfg *Config) LoadPersisted(log log.Logger) error {
if !cfg.Driver.SequencerEnabled {
return nil
}
if state, err := cfg.ConfigPersistence.SequencerState(); err != nil {
return err
} else if state != StateUnset {
stopped := state == StateStopped
if stopped != cfg.Driver.SequencerStopped {
log.Warn(fmt.Sprintf("Overriding %v with persisted state", flags.SequencerStoppedFlag.Name), "stopped", stopped)
}
cfg.Driver.SequencerStopped = stopped
} else {
log.Info("No persisted sequencer state loaded")
}
return nil
}

// Check verifies that the given configuration makes sense
func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil {
Expand Down
Loading

0 comments on commit 2d0895f

Please sign in to comment.