diff --git a/database/database.go b/database/database.go index 9af053ab..6dd9fa89 100644 --- a/database/database.go +++ b/database/database.go @@ -6,60 +6,51 @@ package database import ( - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/awm-relayer/relayer/config" - "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -var ( - ErrKeyNotFound = errors.New("key not found") - ErrRelayerIDNotFound = errors.New("no database entry for relayer id") - ErrDatabaseMisconfiguration = errors.New("database misconfiguration") -) + "strconv" -const ( - LatestProcessedBlockKey DataKey = iota + "github.com/ava-labs/awm-relayer/relayer" + "github.com/ava-labs/awm-relayer/relayer/checkpoint" + "github.com/pkg/errors" ) -type DataKey int +var _ checkpoint.RelayerDatabase = &relayerDatabase{} -func (k DataKey) String() string { - switch k { - case LatestProcessedBlockKey: - return "latestProcessedBlock" +// NewRelayerDatabase instantiate and return a relayerDatabase +func NewRelayerDatabase(db keyValueDatabase) *relayerDatabase { + return &relayerDatabase{ + keyValueDatabase: db, } - return "unknown" } -// RelayerDatabase is a key-value store for relayer state, with each relayerID maintaining its own state. -// Implementations should be thread-safe. -type RelayerDatabase interface { - Get(relayerID common.Hash, key DataKey) ([]byte, error) - Put(relayerID common.Hash, key DataKey, value []byte) error +// relayerDatabase implements the checkpoint RelayerDatabase interface +type relayerDatabase struct { + keyValueDatabase } -func NewDatabase(logger logging.Logger, cfg *config.Config) (RelayerDatabase, error) { - if cfg.RedisURL != "" { - db, err := NewRedisDatabase(logger, cfg.RedisURL, GetConfigRelayerIDs(cfg)) - if err != nil { - logger.Error( - "Failed to create Redis database", - zap.Error(err), - ) - return nil, err - } - return db, nil - } else { - db, err := NewJSONFileStorage(logger, cfg.StorageLocation, GetConfigRelayerIDs(cfg)) - if err != nil { - logger.Error( - "Failed to create JSON database", - zap.Error(err), - ) - return nil, err +func (x *relayerDatabase) GetLatestProcessedBlockHeight(relayerID relayer.RelayerID) (uint64, error) { + latestProcessedBlockData, err := x.Get(relayerID.ID, latestProcessedBlockKey) + if err != nil { + if isKeyNotFoundError(err) { + return 0, checkpoint.ErrNotFound } - return db, nil + return 0, err } + latestProcessedBlock, err := strconv.ParseUint(string(latestProcessedBlockData), 10, 64) + if err != nil { + return 0, err + } + return latestProcessedBlock, nil +} + +func (x *relayerDatabase) StoreLatestProcessedBlockHeight(relayerID relayer.RelayerID, height uint64) error { + return x.Put( + relayerID.ID, + latestProcessedBlockKey, + []byte(strconv.FormatUint(height, 10)), + ) +} + +// Returns true if an error returned by a RelayerDatabase indicates the requested key was not found. +func isKeyNotFoundError(err error) bool { + return errors.Is(err, errRelayerIDNotFound) || errors.Is(err, errKeyNotFound) } diff --git a/database/database_test.go b/database/database_test.go index d645603f..b7132636 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -1,13 +1,8 @@ package database import ( - "fmt" "testing" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" - "github.com/ava-labs/awm-relayer/relayer/config" - "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -20,12 +15,12 @@ func TestIsKeyNotFoundError(t *testing.T) { }{ { name: "key not found error", - err: ErrKeyNotFound, + err: errKeyNotFound, expected: true, }, { name: "relayer key not found error", - err: ErrRelayerIDNotFound, + err: errRelayerIDNotFound, expected: true, }, { @@ -35,196 +30,7 @@ func TestIsKeyNotFoundError(t *testing.T) { }, } for _, testCase := range testCases { - result := IsKeyNotFoundError(testCase.err) + result := isKeyNotFoundError(testCase.err) require.Equal(t, testCase.expected, result, testCase.name) } } - -func TestCalculateRelayerID(t *testing.T) { - id1, _ := ids.FromString("S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD") - id2, _ := ids.FromString("2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx") - testCases := []struct { - name string - sourceBlockchainID ids.ID - destinationBlockchainID ids.ID - originSenderAddress common.Address - destinationAddress common.Address - expected common.Hash - }{ - { - name: "all zero", - sourceBlockchainID: id1, - destinationBlockchainID: id2, - originSenderAddress: AllAllowedAddress, - destinationAddress: AllAllowedAddress, - expected: common.HexToHash("0xf8a8467088fd6f8ad4577408ddda1607e2702ca9827d7fd556c46adae624b7a2"), - }, - { - name: "zero source address", - sourceBlockchainID: id1, - destinationBlockchainID: id2, - originSenderAddress: AllAllowedAddress, - destinationAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), - expected: common.HexToHash("0xa20ede2231d43d072800ad436a4ca8844f9ddd9cb4174f4cc3046e0958e48320"), - }, - { - name: "zero destination address", - sourceBlockchainID: id1, - destinationBlockchainID: id2, - originSenderAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), - destinationAddress: AllAllowedAddress, - expected: common.HexToHash("0xb205a049831478f55b768a4c875b2085339b6053831ecde8a3d406f9d13454a5"), - }, - { - name: "all non-zero", - sourceBlockchainID: id1, - destinationBlockchainID: id2, - originSenderAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), - destinationAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), - expected: common.HexToHash("0x6661512bc3b5689b28a4c2519425f725b5681b90fea937433103c846f742f918"), - }, - } - for _, testCase := range testCases { - result := CalculateRelayerID( - testCase.sourceBlockchainID, - testCase.destinationBlockchainID, - testCase.originSenderAddress, - testCase.destinationAddress, - ) - require.Equal(t, testCase.expected, result, testCase.name) - } -} - -func TestGetConfigRelayerKeys(t *testing.T) { - allowedAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567") - dstCfg1 := config.TestValidDestinationBlockchainConfig - - // All destination chains and source and destination addresses are allowed - srcCfg1 := config.TestValidSourceBlockchainConfig - - // All destination chains, but only a single source address is allowed - srcCfg2 := config.TestValidSourceBlockchainConfig - srcCfg2.BlockchainID = ids.GenerateTestID().String() - srcCfg2.AllowedOriginSenderAddresses = []string{allowedAddress.String()} - - // Restricted to a single destination chain, but all source and destination addresses are allowed - srcCfg3 := config.TestValidSourceBlockchainConfig - srcCfg3.BlockchainID = ids.GenerateTestID().String() - srcCfg3.SupportedDestinations = []*config.SupportedDestination{ - { - BlockchainID: dstCfg1.BlockchainID, - }, - } - - // Restricted to a single destination chain, but only a single source address is allowed - srcCfg4 := config.TestValidSourceBlockchainConfig - srcCfg4.BlockchainID = ids.GenerateTestID().String() - srcCfg4.AllowedOriginSenderAddresses = []string{allowedAddress.String()} - srcCfg4.SupportedDestinations = []*config.SupportedDestination{ - { - BlockchainID: dstCfg1.BlockchainID, - }, - } - - // Restricted to a single destination chain, but only a single destination address is allowed - srcCfg5 := config.TestValidSourceBlockchainConfig - srcCfg5.BlockchainID = ids.GenerateTestID().String() - srcCfg5.SupportedDestinations = []*config.SupportedDestination{ - { - BlockchainID: dstCfg1.BlockchainID, - Addresses: []string{allowedAddress.String()}, - }, - } - - // Restricted to a single destination, but only a single source and destination address is allowed - srcCfg6 := config.TestValidSourceBlockchainConfig - srcCfg6.BlockchainID = ids.GenerateTestID().String() - srcCfg6.AllowedOriginSenderAddresses = []string{allowedAddress.String()} - srcCfg6.SupportedDestinations = []*config.SupportedDestination{ - { - BlockchainID: dstCfg1.BlockchainID, - Addresses: []string{allowedAddress.String()}, - }, - } - - // - - err := dstCfg1.Validate() - require.ErrorIs(t, err, nil) - - allowedDestinations := set.NewSet[string](1) - allowedDestinations.Add(dstCfg1.BlockchainID) - err = srcCfg1.Validate(&allowedDestinations) - require.ErrorIs(t, err, nil) - err = srcCfg2.Validate(&allowedDestinations) - require.ErrorIs(t, err, nil) - err = srcCfg3.Validate(&allowedDestinations) - require.ErrorIs(t, err, nil) - err = srcCfg4.Validate(&allowedDestinations) - require.ErrorIs(t, err, nil) - err = srcCfg5.Validate(&allowedDestinations) - require.ErrorIs(t, err, nil) - err = srcCfg6.Validate(&allowedDestinations) - require.ErrorIs(t, err, nil) - - cfg := &config.Config{ - SourceBlockchains: []*config.SourceBlockchain{&srcCfg1, &srcCfg2, &srcCfg3, &srcCfg4, &srcCfg5, &srcCfg6}, - DestinationBlockchains: []*config.DestinationBlockchain{&dstCfg1}, - } - - targetIDs := []RelayerID{ - NewRelayerID( - srcCfg1.GetBlockchainID(), - dstCfg1.GetBlockchainID(), - AllAllowedAddress, - AllAllowedAddress, - ), - NewRelayerID( - srcCfg2.GetBlockchainID(), - dstCfg1.GetBlockchainID(), - allowedAddress, - AllAllowedAddress, - ), - NewRelayerID( - srcCfg3.GetBlockchainID(), - dstCfg1.GetBlockchainID(), - AllAllowedAddress, - AllAllowedAddress, - ), - NewRelayerID( - srcCfg4.GetBlockchainID(), - dstCfg1.GetBlockchainID(), - allowedAddress, - AllAllowedAddress, - ), - NewRelayerID( - srcCfg5.GetBlockchainID(), - dstCfg1.GetBlockchainID(), - AllAllowedAddress, - allowedAddress, - ), - NewRelayerID( - srcCfg6.GetBlockchainID(), - dstCfg1.GetBlockchainID(), - allowedAddress, - allowedAddress, - ), - } - - relayerIDs := GetConfigRelayerIDs(cfg) - - // Test that all target IDs are present - for i, id := range targetIDs { - require.True(t, - func(ids []RelayerID, target RelayerID) bool { - for _, id := range ids { - if id.ID == target.ID { - return true - } - } - return false - }(relayerIDs, id), - fmt.Sprintf("target ID %d not found", i), - ) - } -} diff --git a/database/json_file_storage.go b/database/json_file_storage.go index 95353546..f9fe7c63 100644 --- a/database/json_file_storage.go +++ b/database/json_file_storage.go @@ -11,12 +11,14 @@ import ( "sync" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/relayer" + "github.com/ava-labs/awm-relayer/relayer/checkpoint" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "go.uber.org/zap" ) -var _ RelayerDatabase = &JSONFileStorage{} +var _ keyValueDatabase = &JSONFileStorage{} type chainState map[string]string @@ -34,7 +36,7 @@ type JSONFileStorage struct { } // NewJSONFileStorage creates a new JSONFileStorage instance -func NewJSONFileStorage(logger logging.Logger, dir string, relayerIDs []RelayerID) (*JSONFileStorage, error) { +func NewJSONFileStorage(logger logging.Logger, dir string, relayerIDs []relayer.RelayerID) (*JSONFileStorage, error) { storage := &JSONFileStorage{ dir: filepath.Clean(dir), mutexes: make(map[common.Hash]*sync.RWMutex), @@ -68,7 +70,7 @@ func NewJSONFileStorage(logger logging.Logger, dir string, relayerIDs []RelayerI // 0755: The owner can read, write, execute. // Everyone else can read and execute but not modify the file. - err = os.MkdirAll(dir, 0755) + err = os.MkdirAll(dir, 0o755) if err != nil { storage.logger.Error("failed to create directory", zap.String("dir", dir), @@ -84,7 +86,7 @@ func (s *JSONFileStorage) Get(relayerID common.Hash, dataKey DataKey) ([]byte, e mutex, ok := s.mutexes[relayerID] if !ok { return nil, errors.Wrap( - ErrDatabaseMisconfiguration, + checkpoint.ErrDatabaseMisconfiguration, fmt.Sprintf("database not configured for key %s", relayerID.String()), ) } @@ -96,12 +98,12 @@ func (s *JSONFileStorage) Get(relayerID common.Hash, dataKey DataKey) ([]byte, e return nil, err } if !fileExists { - return nil, ErrRelayerIDNotFound + return nil, errRelayerIDNotFound } var val string if val, ok = currentState[dataKey.String()]; !ok { - return nil, ErrKeyNotFound + return nil, errKeyNotFound } return []byte(val), nil @@ -128,7 +130,7 @@ func (s *JSONFileStorage) Put(relayerID common.Hash, dataKey DataKey, value []by mutex, ok := s.mutexes[relayerID] if !ok { return errors.Wrap( - ErrDatabaseMisconfiguration, + checkpoint.ErrDatabaseMisconfiguration, fmt.Sprintf("database not configured for key %s", relayerID.String()), ) } @@ -159,7 +161,7 @@ func (s *JSONFileStorage) write(relayerID common.Hash, v interface{}) error { // If the write fails, the original file is not affected. // Set file permissions to 0644 so only the owner can read and write. // Everyone else can only read. No one can execute the file. - if err := os.WriteFile(tmpPath, b, 0644); err != nil { + if err := os.WriteFile(tmpPath, b, 0o644); err != nil { return errors.Wrap(err, "failed to write file") } diff --git a/database/json_file_storage_test.go b/database/json_file_storage_test.go index 6f7a1303..9b350ef7 100644 --- a/database/json_file_storage_test.go +++ b/database/json_file_storage_test.go @@ -14,22 +14,23 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/awm-relayer/relayer" "github.com/stretchr/testify/assert" ) -func createRelayerIDs(blockchainIDs []ids.ID) []RelayerID { +func createRelayerIDs(blockchainIDs []ids.ID) []relayer.RelayerID { destinationsBlockchainIDs := set.NewSet[string](1) // just needs to be non-nil destinationsBlockchainIDs.Add(ids.GenerateTestID().String()) - var relayerIDs []RelayerID + var relayerIDs []relayer.RelayerID for _, blockchainID := range blockchainIDs { for allowedDestination := range destinationsBlockchainIDs { id, _ := ids.FromString(allowedDestination) - relayerIDs = append(relayerIDs, NewRelayerID( + relayerIDs = append(relayerIDs, relayer.NewRelayerID( blockchainID, id, - AllAllowedAddress, - AllAllowedAddress, + relayer.AllAllowedAddress, + relayer.AllAllowedAddress, ), ) } @@ -61,7 +62,7 @@ func TestConcurrentWriteReadSingleChain(t *testing.T) { finalTargetValue := uint64(11) testWrite(jsonStorage, relayerIDs[0], finalTargetValue) - latestProcessedBlockData, err := jsonStorage.Get(relayerIDs[0].ID, LatestProcessedBlockKey) + latestProcessedBlockData, err := jsonStorage.Get(relayerIDs[0].ID, latestProcessedBlockKey) if err != nil { t.Fatalf("failed to retrieve from JSON storage. err: %v", err) } @@ -107,7 +108,7 @@ func TestConcurrentWriteReadMultipleChains(t *testing.T) { } for i, relayerID := range relayerIDs { - latestProcessedBlockData, err := jsonStorage.Get(relayerID.ID, LatestProcessedBlockKey) + latestProcessedBlockData, err := jsonStorage.Get(relayerID.ID, latestProcessedBlockKey) if err != nil { t.Fatalf("failed to retrieve from JSON storage. networkID: %d err: %v", i, err) } @@ -124,7 +125,7 @@ func TestConcurrentWriteReadMultipleChains(t *testing.T) { } } -func setupJsonStorage(t *testing.T, relayerIDs []RelayerID) *JSONFileStorage { +func setupJsonStorage(t *testing.T, relayerIDs []relayer.RelayerID) *JSONFileStorage { logger := logging.NewLogger( "awm-relayer-test", logging.NewWrappedCore( @@ -142,8 +143,8 @@ func setupJsonStorage(t *testing.T, relayerIDs []RelayerID) *JSONFileStorage { return jsonStorage } -func testWrite(storage *JSONFileStorage, relayerID RelayerID, height uint64) { - err := storage.Put(relayerID.ID, LatestProcessedBlockKey, []byte(strconv.FormatUint(height, 10))) +func testWrite(storage *JSONFileStorage, relayerID relayer.RelayerID, height uint64) { + err := storage.Put(relayerID.ID, latestProcessedBlockKey, []byte(strconv.FormatUint(height, 10))) if err != nil { fmt.Printf("failed to put data: %v", err) return diff --git a/database/kvstore.go b/database/kvstore.go new file mode 100644 index 00000000..e8d65b28 --- /dev/null +++ b/database/kvstore.go @@ -0,0 +1,57 @@ +package database + +import ( + "fmt" + + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/relayer" + "github.com/ava-labs/awm-relayer/relayer/config" + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +var ( + errKeyNotFound = errors.New("key not found") + errRelayerIDNotFound = errors.New("no database entry for relayer id") +) + +const ( + latestProcessedBlockKey DataKey = iota +) + +// RelayerDatabase is a key-value store for relayer state, with each relayerID maintaining its own state. +// Implementations should be thread-safe. +type keyValueDatabase interface { + Get(relayerID common.Hash, key DataKey) ([]byte, error) + Put(relayerID common.Hash, key DataKey, value []byte) error +} + +func NewKeyValueDatabase(logger logging.Logger, cfg *config.Config) (keyValueDatabase, error) { + relayerIDs := relayer.GetConfigRelayerIDs(cfg) + dbConnect := func() (keyValueDatabase, error) { return NewJSONFileStorage(logger, cfg.StorageLocation, relayerIDs) } + usedDB := "JSON" + if cfg.RedisURL != "" { + dbConnect = func() (keyValueDatabase, error) { return NewRedisDatabase(logger, cfg.RedisURL, relayerIDs) } + usedDB = "Redis" + } + db, err := dbConnect() + if err != nil { + logger.Error( + fmt.Sprintf("Failed to create %s database", usedDB), + zap.Error(err), + ) + return nil, err + } + return db, err +} + +type DataKey int + +func (k DataKey) String() string { + switch k { + case latestProcessedBlockKey: + return "latestProcessedBlock" + } + return "unknown" +} diff --git a/database/redis.go b/database/redis.go index fe134366..b8b04f12 100644 --- a/database/redis.go +++ b/database/redis.go @@ -8,19 +8,20 @@ import ( "strings" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/relayer" "github.com/ethereum/go-ethereum/common" "github.com/redis/go-redis/v9" "go.uber.org/zap" ) -var _ RelayerDatabase = &RedisDatabase{} +var _ keyValueDatabase = &RedisDatabase{} type RedisDatabase struct { logger logging.Logger client *redis.Client } -func NewRedisDatabase(logger logging.Logger, redisURL string, relayerIDs []RelayerID) (*RedisDatabase, error) { +func NewRedisDatabase(logger logging.Logger, redisURL string, relayerIDs []relayer.RelayerID) (*RedisDatabase, error) { opts, err := redis.ParseURL(redisURL) if err != nil { logger.Error( @@ -50,7 +51,7 @@ func (r *RedisDatabase) Get(relayerID common.Hash, key DataKey) ([]byte, error) zap.String("key", compositeKey), zap.Error(err)) if err == redis.Nil { - return nil, ErrKeyNotFound + return nil, errKeyNotFound } return nil, err } diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index bb0c2f3e..4b1330aa 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -12,7 +12,6 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" - "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/peers" "github.com/ava-labs/awm-relayer/relayer/config" @@ -35,10 +34,8 @@ const ( signatureRequestRetryWaitPeriodMs = 10_000 ) -var ( - // Errors - errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint") -) +// Errors +var errFailedToGetAggSig = errors.New("failed to get aggregate signature from node endpoint") // CheckpointManager stores committed heights in the database type CheckpointManager interface { @@ -61,7 +58,7 @@ type ApplicationRelayer struct { sourceBlockchain config.SourceBlockchain signingSubnetID ids.ID destinationClient vms.DestinationClient - relayerID database.RelayerID + relayerID RelayerID warpQuorum config.WarpQuorum checkpointManager CheckpointManager sourceWarpSignatureClient *rpc.Client // nil if configured to fetch signatures via AppRequest for the source blockchain @@ -72,7 +69,7 @@ func NewApplicationRelayer( logger logging.Logger, metrics *ApplicationRelayerMetrics, network *peers.AppRequestNetwork, - relayerID database.RelayerID, + relayerID RelayerID, destinationClient vms.DestinationClient, sourceBlockchain config.SourceBlockchain, checkpointManager CheckpointManager, @@ -251,7 +248,7 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (co return txHash, nil } -func (r *ApplicationRelayer) RelayerID() database.RelayerID { +func (r *ApplicationRelayer) RelayerID() RelayerID { return r.relayerID } diff --git a/relayer/checkpoint/checkpoint.go b/relayer/checkpoint/checkpoint.go index 6df59338..742432e5 100644 --- a/relayer/checkpoint/checkpoint.go +++ b/relayer/checkpoint/checkpoint.go @@ -5,11 +5,11 @@ package checkpoint import ( "container/heap" - "strconv" + "errors" "sync" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/relayer" "github.com/ava-labs/awm-relayer/utils" "go.uber.org/zap" ) @@ -20,9 +20,9 @@ import ( type CheckpointManager struct { logger logging.Logger - database database.RelayerDatabase + database RelayerDatabase writeSignal chan struct{} - relayerID database.RelayerID + relayerID relayer.RelayerID committedHeight uint64 lock *sync.RWMutex pendingCommits *utils.UInt64Heap @@ -30,9 +30,9 @@ type CheckpointManager struct { func NewCheckpointManager( logger logging.Logger, - database database.RelayerDatabase, + database RelayerDatabase, writeSignal chan struct{}, - relayerID database.RelayerID, + relayerID relayer.RelayerID, startingHeight uint64, ) *CheckpointManager { h := &utils.UInt64Heap{} @@ -64,8 +64,8 @@ func (cm *CheckpointManager) writeToDatabase() { if cm.committedHeight == 0 { return } - storedHeight, err := database.GetLatestProcessedBlockHeight(cm.database, cm.relayerID) - if err != nil && !database.IsKeyNotFoundError(err) { + storedHeight, err := cm.database.GetLatestProcessedBlockHeight(cm.relayerID) + if err != nil && !errors.Is(err, ErrNotFound) { cm.logger.Error( "Failed to get latest processed block height", zap.Error(err), @@ -81,11 +81,7 @@ func (cm *CheckpointManager) writeToDatabase() { zap.Uint64("height", cm.committedHeight), zap.String("relayerID", cm.relayerID.ID.String()), ) - err = cm.database.Put( - cm.relayerID.ID, - database.LatestProcessedBlockKey, - []byte(strconv.FormatUint(cm.committedHeight, 10)), - ) + err = cm.database.StoreLatestProcessedBlockHeight(cm.relayerID, cm.committedHeight) if err != nil { cm.logger.Error( "Failed to write latest processed block height", diff --git a/relayer/checkpoint/checkpoint_test.go b/relayer/checkpoint/checkpoint_test.go index 155616a9..33037c9f 100644 --- a/relayer/checkpoint/checkpoint_test.go +++ b/relayer/checkpoint/checkpoint_test.go @@ -5,9 +5,10 @@ import ( "testing" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/awm-relayer/database" - mock_database "github.com/ava-labs/awm-relayer/database/mocks" + "github.com/ava-labs/awm-relayer/relayer" + mock_database "github.com/ava-labs/awm-relayer/relayer/checkpoint/mocks" "github.com/ava-labs/awm-relayer/utils" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" @@ -60,7 +61,7 @@ func TestCommitHeight(t *testing.T) { } db := mock_database.NewMockRelayerDatabase(gomock.NewController(t)) for _, test := range testCases { - id := database.RelayerID{ + id := relayer.RelayerID{ ID: common.BytesToHash(crypto.Keccak256([]byte(test.name))), } cm := NewCheckpointManager(logging.NoLog{}, db, nil, id, test.currentMaxHeight) diff --git a/relayer/checkpoint/database.go b/relayer/checkpoint/database.go new file mode 100644 index 00000000..b96386eb --- /dev/null +++ b/relayer/checkpoint/database.go @@ -0,0 +1,22 @@ +//go:generate mockgen -source=$GOFILE -destination=./mocks/mock_database.go -package=mocks + +package checkpoint + +import ( + "errors" + + "github.com/ava-labs/awm-relayer/relayer" +) + +var ( + // Errors expected to be used by the RelayerDatabase implementations + // + ErrNotFound = errors.New("not found") + ErrDatabaseMisconfiguration = errors.New("database misconfiguration") +) + +// RelayerDatabase defines the interface used by the checkpoint manager to store last processed block height +type RelayerDatabase interface { + GetLatestProcessedBlockHeight(relayerID relayer.RelayerID) (uint64, error) + StoreLatestProcessedBlockHeight(relayerID relayer.RelayerID, height uint64) error +} diff --git a/relayer/checkpoint/mocks/mock_database.go b/relayer/checkpoint/mocks/mock_database.go new file mode 100644 index 00000000..8a0f2d5c --- /dev/null +++ b/relayer/checkpoint/mocks/mock_database.go @@ -0,0 +1,69 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: database.go +// +// Generated by this command: +// +// mockgen -source=database.go -destination=./mocks/mock_database.go -package=mocks +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + relayer "github.com/ava-labs/awm-relayer/relayer" + gomock "go.uber.org/mock/gomock" +) + +// MockRelayerDatabase is a mock of RelayerDatabase interface. +type MockRelayerDatabase struct { + ctrl *gomock.Controller + recorder *MockRelayerDatabaseMockRecorder +} + +// MockRelayerDatabaseMockRecorder is the mock recorder for MockRelayerDatabase. +type MockRelayerDatabaseMockRecorder struct { + mock *MockRelayerDatabase +} + +// NewMockRelayerDatabase creates a new mock instance. +func NewMockRelayerDatabase(ctrl *gomock.Controller) *MockRelayerDatabase { + mock := &MockRelayerDatabase{ctrl: ctrl} + mock.recorder = &MockRelayerDatabaseMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRelayerDatabase) EXPECT() *MockRelayerDatabaseMockRecorder { + return m.recorder +} + +// GetLatestProcessedBlockHeight mocks base method. +func (m *MockRelayerDatabase) GetLatestProcessedBlockHeight(relayerID relayer.RelayerID) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLatestProcessedBlockHeight", relayerID) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLatestProcessedBlockHeight indicates an expected call of GetLatestProcessedBlockHeight. +func (mr *MockRelayerDatabaseMockRecorder) GetLatestProcessedBlockHeight(relayerID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestProcessedBlockHeight", reflect.TypeOf((*MockRelayerDatabase)(nil).GetLatestProcessedBlockHeight), relayerID) +} + +// StoreLatestProcessedBlockHeight mocks base method. +func (m *MockRelayerDatabase) StoreLatestProcessedBlockHeight(relayerID relayer.RelayerID, height uint64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StoreLatestProcessedBlockHeight", relayerID, height) + ret0, _ := ret[0].(error) + return ret0 +} + +// StoreLatestProcessedBlockHeight indicates an expected call of StoreLatestProcessedBlockHeight. +func (mr *MockRelayerDatabaseMockRecorder) StoreLatestProcessedBlockHeight(relayerID, height any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreLatestProcessedBlockHeight", reflect.TypeOf((*MockRelayerDatabase)(nil).StoreLatestProcessedBlockHeight), relayerID, height) +} diff --git a/database/utils.go b/relayer/checkpoint/utils.go similarity index 70% rename from database/utils.go rename to relayer/checkpoint/utils.go index b0a513a4..134e7f9b 100644 --- a/database/utils.go +++ b/relayer/checkpoint/utils.go @@ -1,21 +1,13 @@ -// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package database +package checkpoint import ( - "strconv" + "errors" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/pkg/errors" + "github.com/ava-labs/awm-relayer/relayer" "go.uber.org/zap" ) -// Returns true if an error returned by a RelayerDatabase indicates the requested key was not found. -func IsKeyNotFoundError(err error) bool { - return errors.Is(err, ErrRelayerIDNotFound) || errors.Is(err, ErrKeyNotFound) -} - // Determines the height to process from. There are three cases: // 1) The database contains the latest processed block data for the chain // - In this case, we return the maximum of the latest processed block and the @@ -30,12 +22,12 @@ func IsKeyNotFoundError(err error) bool { func CalculateStartingBlockHeight( logger logging.Logger, db RelayerDatabase, - relayerID RelayerID, + relayerID relayer.RelayerID, processHistoricalBlocksFromHeight uint64, currentHeight uint64, ) (uint64, error) { - latestProcessedBlock, err := GetLatestProcessedBlockHeight(db, relayerID) - if IsKeyNotFoundError(err) { + latestProcessedBlock, err := db.GetLatestProcessedBlockHeight(relayerID) + if errors.Is(err, ErrNotFound) { // The database does not contain the latest processed block data for the chain, // use the configured process-historical-blocks-from-height instead. // If process-historical-blocks-from-height was not configured, start from the chain head. @@ -71,16 +63,3 @@ func CalculateStartingBlockHeight( ) return processHistoricalBlocksFromHeight, nil } - -// Helper function to get the latest processed block height from the database. -func GetLatestProcessedBlockHeight(db RelayerDatabase, relayerID RelayerID) (uint64, error) { - latestProcessedBlockData, err := db.Get(relayerID.ID, LatestProcessedBlockKey) - if err != nil { - return 0, err - } - latestProcessedBlock, err := strconv.ParseUint(string(latestProcessedBlockData), 10, 64) - if err != nil { - return 0, err - } - return latestProcessedBlock, nil -} diff --git a/database/utils_test.go b/relayer/checkpoint/utils_test.go similarity index 77% rename from database/utils_test.go rename to relayer/checkpoint/utils_test.go index 8f9e3ef6..419ce9d6 100644 --- a/database/utils_test.go +++ b/relayer/checkpoint/utils_test.go @@ -1,12 +1,11 @@ -package database +package checkpoint import ( "fmt" - "strconv" "testing" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ethereum/go-ethereum/common" + "github.com/ava-labs/awm-relayer/relayer" "github.com/stretchr/testify/require" ) @@ -25,7 +24,7 @@ func TestCalculateStartingBlockHeight(t *testing.T) { name: "value in cfg, no value in db", cfgBlock: 100, dbBlock: 0, - dbError: ErrKeyNotFound, + dbError: ErrNotFound, expectedBlock: 100, expectedError: nil, }, @@ -61,7 +60,7 @@ func TestCalculateStartingBlockHeight(t *testing.T) { name: "no DB value, no cfg value", cfgBlock: 0, dbBlock: 0, - dbError: ErrKeyNotFound, + dbError: ErrNotFound, expectedBlock: currentBlock, expectedError: nil, }, @@ -69,11 +68,11 @@ func TestCalculateStartingBlockHeight(t *testing.T) { for _, testCase := range testCases { db := &mockDB{} - db.getFunc = func(relayerID common.Hash, key DataKey) ([]byte, error) { - return []byte(strconv.FormatUint(testCase.dbBlock, 10)), testCase.dbError + db.getFunc = func(relayerID relayer.RelayerID) (uint64, error) { + return testCase.dbBlock, testCase.dbError } - ret, err := CalculateStartingBlockHeight(logging.NoLog{}, db, RelayerID{}, testCase.cfgBlock, currentBlock) + ret, err := CalculateStartingBlockHeight(logging.NoLog{}, db, relayer.RelayerID{}, testCase.cfgBlock, currentBlock) if testCase.expectedError == nil { require.NoError(t, err, fmt.Sprintf("test failed: %s", testCase.name)) require.Equal(t, testCase.expectedBlock, ret, fmt.Sprintf("test failed: %s", testCase.name)) @@ -85,13 +84,13 @@ func TestCalculateStartingBlockHeight(t *testing.T) { // in-package mock to allow for unit testing of non-receiver functions that use the RelayerDatabase interface type mockDB struct { - getFunc func(relayerID common.Hash, key DataKey) ([]byte, error) + getFunc func(relayerID relayer.RelayerID) (uint64, error) } -func (m *mockDB) Get(relayerID common.Hash, key DataKey) ([]byte, error) { - return m.getFunc(relayerID, key) +func (m *mockDB) GetLatestProcessedBlockHeight(relayerID relayer.RelayerID) (uint64, error) { + return m.getFunc(relayerID) } -func (m *mockDB) Put(relayerID common.Hash, key DataKey, value []byte) error { +func (m *mockDB) StoreLatestProcessedBlockHeight(relayerID relayer.RelayerID, value uint64) error { return nil } diff --git a/relayer/main/main.go b/relayer/main/main.go index 81945615..26a48779 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -185,11 +185,12 @@ func main() { } // Initialize the database - db, err := database.NewDatabase(logger, &cfg) + kvdb, err := database.NewKeyValueDatabase(logger, &cfg) if err != nil { logger.Fatal("Failed to create database", zap.Error(err)) panic(err) } + db := database.NewRelayerDatabase(kvdb) // Initialize the global write ticker ticker := utils.NewTicker(cfg.DBWriteIntervalSeconds) @@ -361,7 +362,7 @@ func createApplicationRelayers( ctx context.Context, logger logging.Logger, relayerMetrics *relayer.ApplicationRelayerMetrics, - db database.RelayerDatabase, + db checkpoint.RelayerDatabase, ticker *utils.Ticker, network *peers.AppRequestNetwork, cfg *config.Config, @@ -419,7 +420,7 @@ func createApplicationRelayersForSourceChain( ctx context.Context, logger logging.Logger, metrics *relayer.ApplicationRelayerMetrics, - db database.RelayerDatabase, + db checkpoint.RelayerDatabase, ticker *utils.Ticker, sourceBlockchain config.SourceBlockchain, network *peers.AppRequestNetwork, @@ -447,11 +448,11 @@ func createApplicationRelayersForSourceChain( height = currentHeight + 1 minHeight = height } - for _, relayerID := range database.GetSourceBlockchainRelayerIDs(&sourceBlockchain) { + for _, relayerID := range relayer.GetSourceBlockchainRelayerIDs(&sourceBlockchain) { // Calculate the catch-up starting block height, and update the min height if necessary if cfg.ProcessMissedBlocks { var err error - height, err = database.CalculateStartingBlockHeight( + height, err = checkpoint.CalculateStartingBlockHeight( logger, db, relayerID, diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index f9de1b1c..cf7a6d18 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -11,7 +11,6 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/messages" relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/subnet-evm/core/types" @@ -121,7 +120,7 @@ func (mc *MessageCoordinator) getApplicationRelayer( destinationAddress common.Address, ) *ApplicationRelayer { // Check for an exact match - applicationRelayerID := database.CalculateRelayerID( + applicationRelayerID := CalculateRelayerID( sourceBlockchainID, destinationBlockchainID, originSenderAddress, @@ -133,11 +132,11 @@ func (mc *MessageCoordinator) getApplicationRelayer( // Check for a match on sourceBlockchainID and destinationBlockchainID, with a specific // originSenderAddress and any destinationAddress. - applicationRelayerID = database.CalculateRelayerID( + applicationRelayerID = CalculateRelayerID( sourceBlockchainID, destinationBlockchainID, originSenderAddress, - database.AllAllowedAddress, + AllAllowedAddress, ) if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { return applicationRelayer @@ -145,10 +144,10 @@ func (mc *MessageCoordinator) getApplicationRelayer( // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress // and a specific destinationAddress. - applicationRelayerID = database.CalculateRelayerID( + applicationRelayerID = CalculateRelayerID( sourceBlockchainID, destinationBlockchainID, - database.AllAllowedAddress, + AllAllowedAddress, destinationAddress, ) if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { @@ -157,11 +156,11 @@ func (mc *MessageCoordinator) getApplicationRelayer( // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress // and any destinationAddress. - applicationRelayerID = database.CalculateRelayerID( + applicationRelayerID = CalculateRelayerID( sourceBlockchainID, destinationBlockchainID, - database.AllAllowedAddress, - database.AllAllowedAddress, + AllAllowedAddress, + AllAllowedAddress, ) if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { return applicationRelayer diff --git a/database/relayer_id.go b/relayer/relayer_id.go similarity index 95% rename from database/relayer_id.go rename to relayer/relayer_id.go index 2b8fee17..a6c7b212 100644 --- a/database/relayer_id.go +++ b/relayer/relayer_id.go @@ -1,7 +1,7 @@ // Copyright (C) 2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package database +package relayer import ( "strings" @@ -13,10 +13,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" ) -var ( - // AllAllowedAddress is used to construct relayer IDs when all addresses are allowed - AllAllowedAddress = utils.ZeroAddress -) +// AllAllowedAddress is used to construct relayer IDs when all addresses are allowed +var AllAllowedAddress = utils.ZeroAddress // RelayerID is a unique identifier for an application relayer type RelayerID struct { diff --git a/relayer/relayer_id_test.go b/relayer/relayer_id_test.go new file mode 100644 index 00000000..0e4a8b36 --- /dev/null +++ b/relayer/relayer_id_test.go @@ -0,0 +1,201 @@ +package relayer + +import ( + "fmt" + "testing" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/awm-relayer/relayer/config" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestCalculateRelayerID(t *testing.T) { + id1, _ := ids.FromString("S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD") + id2, _ := ids.FromString("2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx") + testCases := []struct { + name string + sourceBlockchainID ids.ID + destinationBlockchainID ids.ID + originSenderAddress common.Address + destinationAddress common.Address + expected common.Hash + }{ + { + name: "all zero", + sourceBlockchainID: id1, + destinationBlockchainID: id2, + originSenderAddress: AllAllowedAddress, + destinationAddress: AllAllowedAddress, + expected: common.HexToHash("0xf8a8467088fd6f8ad4577408ddda1607e2702ca9827d7fd556c46adae624b7a2"), + }, + { + name: "zero source address", + sourceBlockchainID: id1, + destinationBlockchainID: id2, + originSenderAddress: AllAllowedAddress, + destinationAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), + expected: common.HexToHash("0xa20ede2231d43d072800ad436a4ca8844f9ddd9cb4174f4cc3046e0958e48320"), + }, + { + name: "zero destination address", + sourceBlockchainID: id1, + destinationBlockchainID: id2, + originSenderAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), + destinationAddress: AllAllowedAddress, + expected: common.HexToHash("0xb205a049831478f55b768a4c875b2085339b6053831ecde8a3d406f9d13454a5"), + }, + { + name: "all non-zero", + sourceBlockchainID: id1, + destinationBlockchainID: id2, + originSenderAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), + destinationAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"), + expected: common.HexToHash("0x6661512bc3b5689b28a4c2519425f725b5681b90fea937433103c846f742f918"), + }, + } + for _, testCase := range testCases { + result := CalculateRelayerID( + testCase.sourceBlockchainID, + testCase.destinationBlockchainID, + testCase.originSenderAddress, + testCase.destinationAddress, + ) + require.Equal(t, testCase.expected, result, testCase.name) + } +} + +func TestGetConfigRelayerKeys(t *testing.T) { + allowedAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567") + dstCfg1 := config.TestValidDestinationBlockchainConfig + + // All destination chains and source and destination addresses are allowed + srcCfg1 := config.TestValidSourceBlockchainConfig + + // All destination chains, but only a single source address is allowed + srcCfg2 := config.TestValidSourceBlockchainConfig + srcCfg2.BlockchainID = ids.GenerateTestID().String() + srcCfg2.AllowedOriginSenderAddresses = []string{allowedAddress.String()} + + // Restricted to a single destination chain, but all source and destination addresses are allowed + srcCfg3 := config.TestValidSourceBlockchainConfig + srcCfg3.BlockchainID = ids.GenerateTestID().String() + srcCfg3.SupportedDestinations = []*config.SupportedDestination{ + { + BlockchainID: dstCfg1.BlockchainID, + }, + } + + // Restricted to a single destination chain, but only a single source address is allowed + srcCfg4 := config.TestValidSourceBlockchainConfig + srcCfg4.BlockchainID = ids.GenerateTestID().String() + srcCfg4.AllowedOriginSenderAddresses = []string{allowedAddress.String()} + srcCfg4.SupportedDestinations = []*config.SupportedDestination{ + { + BlockchainID: dstCfg1.BlockchainID, + }, + } + + // Restricted to a single destination chain, but only a single destination address is allowed + srcCfg5 := config.TestValidSourceBlockchainConfig + srcCfg5.BlockchainID = ids.GenerateTestID().String() + srcCfg5.SupportedDestinations = []*config.SupportedDestination{ + { + BlockchainID: dstCfg1.BlockchainID, + Addresses: []string{allowedAddress.String()}, + }, + } + + // Restricted to a single destination, but only a single source and destination address is allowed + srcCfg6 := config.TestValidSourceBlockchainConfig + srcCfg6.BlockchainID = ids.GenerateTestID().String() + srcCfg6.AllowedOriginSenderAddresses = []string{allowedAddress.String()} + srcCfg6.SupportedDestinations = []*config.SupportedDestination{ + { + BlockchainID: dstCfg1.BlockchainID, + Addresses: []string{allowedAddress.String()}, + }, + } + + // + + err := dstCfg1.Validate() + require.ErrorIs(t, err, nil) + + allowedDestinations := set.NewSet[string](1) + allowedDestinations.Add(dstCfg1.BlockchainID) + err = srcCfg1.Validate(&allowedDestinations) + require.ErrorIs(t, err, nil) + err = srcCfg2.Validate(&allowedDestinations) + require.ErrorIs(t, err, nil) + err = srcCfg3.Validate(&allowedDestinations) + require.ErrorIs(t, err, nil) + err = srcCfg4.Validate(&allowedDestinations) + require.ErrorIs(t, err, nil) + err = srcCfg5.Validate(&allowedDestinations) + require.ErrorIs(t, err, nil) + err = srcCfg6.Validate(&allowedDestinations) + require.ErrorIs(t, err, nil) + + cfg := &config.Config{ + SourceBlockchains: []*config.SourceBlockchain{&srcCfg1, &srcCfg2, &srcCfg3, &srcCfg4, &srcCfg5, &srcCfg6}, + DestinationBlockchains: []*config.DestinationBlockchain{&dstCfg1}, + } + + targetIDs := []RelayerID{ + NewRelayerID( + srcCfg1.GetBlockchainID(), + dstCfg1.GetBlockchainID(), + AllAllowedAddress, + AllAllowedAddress, + ), + NewRelayerID( + srcCfg2.GetBlockchainID(), + dstCfg1.GetBlockchainID(), + allowedAddress, + AllAllowedAddress, + ), + NewRelayerID( + srcCfg3.GetBlockchainID(), + dstCfg1.GetBlockchainID(), + AllAllowedAddress, + AllAllowedAddress, + ), + NewRelayerID( + srcCfg4.GetBlockchainID(), + dstCfg1.GetBlockchainID(), + allowedAddress, + AllAllowedAddress, + ), + NewRelayerID( + srcCfg5.GetBlockchainID(), + dstCfg1.GetBlockchainID(), + AllAllowedAddress, + allowedAddress, + ), + NewRelayerID( + srcCfg6.GetBlockchainID(), + dstCfg1.GetBlockchainID(), + allowedAddress, + allowedAddress, + ), + } + + relayerIDs := GetConfigRelayerIDs(cfg) + + // Test that all target IDs are present + for i, id := range targetIDs { + require.True(t, + func(ids []RelayerID, target RelayerID) bool { + for _, id := range ids { + if id.ID == target.ID { + return true + } + } + return false + }(relayerIDs, id), + fmt.Sprintf("target ID %d not found", i), + ) + } +} diff --git a/tests/allowed_addresses.go b/tests/allowed_addresses.go index e31086f7..5e56984d 100644 --- a/tests/allowed_addresses.go +++ b/tests/allowed_addresses.go @@ -4,11 +4,11 @@ import ( "context" "crypto/ecdsa" "fmt" - "strconv" "time" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/relayer" "github.com/ava-labs/awm-relayer/relayer/config" testUtils "github.com/ava-labs/awm-relayer/tests/utils" "github.com/ava-labs/subnet-evm/accounts/abi/bind" @@ -20,10 +20,12 @@ import ( . "github.com/onsi/gomega" ) -const relayerCfgFname1 = "relayer-config-1.json" -const relayerCfgFname2 = "relayer-config-2.json" -const relayerCfgFname3 = "relayer-config-3.json" -const relayerCfgFname4 = "relayer-config-4.json" +const ( + relayerCfgFname1 = "relayer-config-1.json" + relayerCfgFname2 = "relayer-config-2.json" + relayerCfgFname3 = "relayer-config-3.json" + relayerCfgFname4 = "relayer-config-4.json" +) const numKeys = 4 @@ -319,25 +321,25 @@ func AllowedAddresses(network interfaces.LocalNetwork) { // // Create relayer keys that allow all source and destination addresses - relayerID1 := database.NewRelayerID( + relayerID1 := relayer.NewRelayerID( subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, - database.AllAllowedAddress, - database.AllAllowedAddress, + relayer.AllAllowedAddress, + relayer.AllAllowedAddress, ) - relayerID2 := database.NewRelayerID( + relayerID2 := relayer.NewRelayerID( subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, allowedAddresses[relayer2AllowedSrcAddressIdx], - database.AllAllowedAddress, + relayer.AllAllowedAddress, ) - relayerID3 := database.NewRelayerID( + relayerID3 := relayer.NewRelayerID( subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, - database.AllAllowedAddress, + relayer.AllAllowedAddress, allowedAddresses[relayer3AllowedDstAddressIdx], ) - relayerID4 := database.NewRelayerID( + relayerID4 := relayer.NewRelayerID( subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, allowedAddresses[relayer4AllowedSrcAddressIdx], @@ -352,32 +354,26 @@ func AllowedAddresses(network interfaces.LocalNetwork) { relayerID4.ID.String(), ), ) - relayerKeys := []database.RelayerID{relayerID1, relayerID2, relayerID3, relayerID4} - jsonDB, err := database.NewJSONFileStorage(logging.NoLog{}, testUtils.StorageLocation, relayerKeys) - Expect(err).Should(BeNil()) + relayerKeys := []relayer.RelayerID{relayerID1, relayerID2, relayerID3, relayerID4} + // jsonDB, err := database.NewJSONFileStorage(logging.NoLog{}, testUtils.StorageLocation, relayerKeys) // Fetch the checkpointed heights from the shared database - data, err := jsonDB.Get(relayerID1.ID, database.LatestProcessedBlockKey) + jsonDB, err := database.NewJSONFileStorage(logging.NoLog{}, testUtils.StorageLocation, relayerKeys) Expect(err).Should(BeNil()) - storedHeight1, err := strconv.ParseUint(string(data), 10, 64) + db := database.NewRelayerDatabase(jsonDB) + storedHeight1, err := db.GetLatestProcessedBlockHeight(relayerID1) Expect(err).Should(BeNil()) Expect(storedHeight1).Should(Equal(height1)) - data, err = jsonDB.Get(relayerID2.ID, database.LatestProcessedBlockKey) - Expect(err).Should(BeNil()) - storedHeight2, err := strconv.ParseUint(string(data), 10, 64) + storedHeight2, err := db.GetLatestProcessedBlockHeight(relayerID2) Expect(err).Should(BeNil()) Expect(storedHeight2).Should(Equal(height2)) - data, err = jsonDB.Get(relayerID3.ID, database.LatestProcessedBlockKey) - Expect(err).Should(BeNil()) - storedHeight3, err := strconv.ParseUint(string(data), 10, 64) + storedHeight3, err := db.GetLatestProcessedBlockHeight(relayerID3) Expect(err).Should(BeNil()) Expect(storedHeight3).Should(Equal(height3)) - data, err = jsonDB.Get(relayerID4.ID, database.LatestProcessedBlockKey) - Expect(err).Should(BeNil()) - storedHeight4, err := strconv.ParseUint(string(data), 10, 64) + storedHeight4, err := db.GetLatestProcessedBlockHeight(relayerID4) Expect(err).Should(BeNil()) Expect(storedHeight4).Should(Equal(height4)) } diff --git a/tests/basic_relay.go b/tests/basic_relay.go index 713aef5c..b4064643 100644 --- a/tests/basic_relay.go +++ b/tests/basic_relay.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/relayer" testUtils "github.com/ava-labs/awm-relayer/tests/utils" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/teleporter/tests/interfaces" @@ -112,27 +113,28 @@ func BasicRelay(network interfaces.LocalNetwork) { jsonDB, err := database.NewJSONFileStorage( logger, relayerConfig.StorageLocation, - database.GetConfigRelayerIDs(&relayerConfig), + relayer.GetConfigRelayerIDs(&relayerConfig), ) Expect(err).Should(BeNil()) // Create relayer keys that allow all source and destination addresses - relayerIDA := database.CalculateRelayerID( + relayerIDA := relayer.NewRelayerID( subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, - database.AllAllowedAddress, - database.AllAllowedAddress, + relayer.AllAllowedAddress, + relayer.AllAllowedAddress, ) - relayerIDB := database.CalculateRelayerID( + relayerIDB := relayer.NewRelayerID( subnetBInfo.BlockchainID, subnetAInfo.BlockchainID, - database.AllAllowedAddress, - database.AllAllowedAddress, + relayer.AllAllowedAddress, + relayer.AllAllowedAddress, ) - // Modify the JSON database to force the relayer to re-process old blocks - err = jsonDB.Put(relayerIDA, database.LatestProcessedBlockKey, []byte("0")) + // Modify the JSON database relayer to force the relayer to re-process old blocks + db := database.NewRelayerDatabase(jsonDB) + err = db.StoreLatestProcessedBlockHeight(relayerIDA, 0) Expect(err).Should(BeNil()) - err = jsonDB.Put(relayerIDB, database.LatestProcessedBlockKey, []byte("0")) + err = db.StoreLatestProcessedBlockHeight(relayerIDB, 0) Expect(err).Should(BeNil()) // Subscribe to the destination chain