Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: database and checkpoint dependencies #421

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 36 additions & 45 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,51 @@
package database

import (
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/relayer/config"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"go.uber.org/zap"
)

var (
ErrKeyNotFound = errors.New("key not found")
ErrRelayerIDNotFound = errors.New("no database entry for relayer id")
ErrDatabaseMisconfiguration = errors.New("database misconfiguration")
)
"strconv"

const (
LatestProcessedBlockKey DataKey = iota
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/relayer/checkpoint"
"github.com/pkg/errors"
)

type DataKey int
var _ checkpoint.RelayerDatabase = &relayerDatabase{}

func (k DataKey) String() string {
switch k {
case LatestProcessedBlockKey:
return "latestProcessedBlock"
// NewRelayerDatabase instantiate and return a relayerDatabase
func NewRelayerDatabase(db keyValueDatabase) *relayerDatabase {
return &relayerDatabase{
keyValueDatabase: db,
}
return "unknown"
}

// RelayerDatabase is a key-value store for relayer state, with each relayerID maintaining its own state.
// Implementations should be thread-safe.
type RelayerDatabase interface {
Get(relayerID common.Hash, key DataKey) ([]byte, error)
Put(relayerID common.Hash, key DataKey, value []byte) error
// relayerDatabase implements the checkpoint RelayerDatabase interface
type relayerDatabase struct {
keyValueDatabase
}

func NewDatabase(logger logging.Logger, cfg *config.Config) (RelayerDatabase, error) {
if cfg.RedisURL != "" {
db, err := NewRedisDatabase(logger, cfg.RedisURL, GetConfigRelayerIDs(cfg))
if err != nil {
logger.Error(
"Failed to create Redis database",
zap.Error(err),
)
return nil, err
}
return db, nil
} else {
db, err := NewJSONFileStorage(logger, cfg.StorageLocation, GetConfigRelayerIDs(cfg))
if err != nil {
logger.Error(
"Failed to create JSON database",
zap.Error(err),
)
return nil, err
func (x *relayerDatabase) GetLatestProcessedBlockHeight(relayerID relayer.RelayerID) (uint64, error) {
latestProcessedBlockData, err := x.Get(relayerID.ID, latestProcessedBlockKey)
if err != nil {
if isKeyNotFoundError(err) {
return 0, checkpoint.ErrNotFound
}
return db, nil
return 0, err
}
latestProcessedBlock, err := strconv.ParseUint(string(latestProcessedBlockData), 10, 64)
if err != nil {
return 0, err
}
return latestProcessedBlock, nil
}

func (x *relayerDatabase) StoreLatestProcessedBlockHeight(relayerID relayer.RelayerID, height uint64) error {
return x.Put(
relayerID.ID,
latestProcessedBlockKey,
[]byte(strconv.FormatUint(height, 10)),
)
}

// Returns true if an error returned by a RelayerDatabase indicates the requested key was not found.
func isKeyNotFoundError(err error) bool {
return errors.Is(err, errRelayerIDNotFound) || errors.Is(err, errKeyNotFound)
}
200 changes: 3 additions & 197 deletions database/database_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package database

import (
"fmt"
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/awm-relayer/relayer/config"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -20,12 +15,12 @@ func TestIsKeyNotFoundError(t *testing.T) {
}{
{
name: "key not found error",
err: ErrKeyNotFound,
err: errKeyNotFound,
expected: true,
},
{
name: "relayer key not found error",
err: ErrRelayerIDNotFound,
err: errRelayerIDNotFound,
expected: true,
},
{
Expand All @@ -35,196 +30,7 @@ func TestIsKeyNotFoundError(t *testing.T) {
},
}
for _, testCase := range testCases {
result := IsKeyNotFoundError(testCase.err)
result := isKeyNotFoundError(testCase.err)
require.Equal(t, testCase.expected, result, testCase.name)
}
}

func TestCalculateRelayerID(t *testing.T) {
id1, _ := ids.FromString("S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD")
id2, _ := ids.FromString("2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx")
testCases := []struct {
name string
sourceBlockchainID ids.ID
destinationBlockchainID ids.ID
originSenderAddress common.Address
destinationAddress common.Address
expected common.Hash
}{
{
name: "all zero",
sourceBlockchainID: id1,
destinationBlockchainID: id2,
originSenderAddress: AllAllowedAddress,
destinationAddress: AllAllowedAddress,
expected: common.HexToHash("0xf8a8467088fd6f8ad4577408ddda1607e2702ca9827d7fd556c46adae624b7a2"),
},
{
name: "zero source address",
sourceBlockchainID: id1,
destinationBlockchainID: id2,
originSenderAddress: AllAllowedAddress,
destinationAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"),
expected: common.HexToHash("0xa20ede2231d43d072800ad436a4ca8844f9ddd9cb4174f4cc3046e0958e48320"),
},
{
name: "zero destination address",
sourceBlockchainID: id1,
destinationBlockchainID: id2,
originSenderAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"),
destinationAddress: AllAllowedAddress,
expected: common.HexToHash("0xb205a049831478f55b768a4c875b2085339b6053831ecde8a3d406f9d13454a5"),
},
{
name: "all non-zero",
sourceBlockchainID: id1,
destinationBlockchainID: id2,
originSenderAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"),
destinationAddress: common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567"),
expected: common.HexToHash("0x6661512bc3b5689b28a4c2519425f725b5681b90fea937433103c846f742f918"),
},
}
for _, testCase := range testCases {
result := CalculateRelayerID(
testCase.sourceBlockchainID,
testCase.destinationBlockchainID,
testCase.originSenderAddress,
testCase.destinationAddress,
)
require.Equal(t, testCase.expected, result, testCase.name)
}
}

func TestGetConfigRelayerKeys(t *testing.T) {
allowedAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567")
dstCfg1 := config.TestValidDestinationBlockchainConfig

// All destination chains and source and destination addresses are allowed
srcCfg1 := config.TestValidSourceBlockchainConfig

// All destination chains, but only a single source address is allowed
srcCfg2 := config.TestValidSourceBlockchainConfig
srcCfg2.BlockchainID = ids.GenerateTestID().String()
srcCfg2.AllowedOriginSenderAddresses = []string{allowedAddress.String()}

// Restricted to a single destination chain, but all source and destination addresses are allowed
srcCfg3 := config.TestValidSourceBlockchainConfig
srcCfg3.BlockchainID = ids.GenerateTestID().String()
srcCfg3.SupportedDestinations = []*config.SupportedDestination{
{
BlockchainID: dstCfg1.BlockchainID,
},
}

// Restricted to a single destination chain, but only a single source address is allowed
srcCfg4 := config.TestValidSourceBlockchainConfig
srcCfg4.BlockchainID = ids.GenerateTestID().String()
srcCfg4.AllowedOriginSenderAddresses = []string{allowedAddress.String()}
srcCfg4.SupportedDestinations = []*config.SupportedDestination{
{
BlockchainID: dstCfg1.BlockchainID,
},
}

// Restricted to a single destination chain, but only a single destination address is allowed
srcCfg5 := config.TestValidSourceBlockchainConfig
srcCfg5.BlockchainID = ids.GenerateTestID().String()
srcCfg5.SupportedDestinations = []*config.SupportedDestination{
{
BlockchainID: dstCfg1.BlockchainID,
Addresses: []string{allowedAddress.String()},
},
}

// Restricted to a single destination, but only a single source and destination address is allowed
srcCfg6 := config.TestValidSourceBlockchainConfig
srcCfg6.BlockchainID = ids.GenerateTestID().String()
srcCfg6.AllowedOriginSenderAddresses = []string{allowedAddress.String()}
srcCfg6.SupportedDestinations = []*config.SupportedDestination{
{
BlockchainID: dstCfg1.BlockchainID,
Addresses: []string{allowedAddress.String()},
},
}

//

err := dstCfg1.Validate()
require.ErrorIs(t, err, nil)

allowedDestinations := set.NewSet[string](1)
allowedDestinations.Add(dstCfg1.BlockchainID)
err = srcCfg1.Validate(&allowedDestinations)
require.ErrorIs(t, err, nil)
err = srcCfg2.Validate(&allowedDestinations)
require.ErrorIs(t, err, nil)
err = srcCfg3.Validate(&allowedDestinations)
require.ErrorIs(t, err, nil)
err = srcCfg4.Validate(&allowedDestinations)
require.ErrorIs(t, err, nil)
err = srcCfg5.Validate(&allowedDestinations)
require.ErrorIs(t, err, nil)
err = srcCfg6.Validate(&allowedDestinations)
require.ErrorIs(t, err, nil)

cfg := &config.Config{
SourceBlockchains: []*config.SourceBlockchain{&srcCfg1, &srcCfg2, &srcCfg3, &srcCfg4, &srcCfg5, &srcCfg6},
DestinationBlockchains: []*config.DestinationBlockchain{&dstCfg1},
}

targetIDs := []RelayerID{
NewRelayerID(
srcCfg1.GetBlockchainID(),
dstCfg1.GetBlockchainID(),
AllAllowedAddress,
AllAllowedAddress,
),
NewRelayerID(
srcCfg2.GetBlockchainID(),
dstCfg1.GetBlockchainID(),
allowedAddress,
AllAllowedAddress,
),
NewRelayerID(
srcCfg3.GetBlockchainID(),
dstCfg1.GetBlockchainID(),
AllAllowedAddress,
AllAllowedAddress,
),
NewRelayerID(
srcCfg4.GetBlockchainID(),
dstCfg1.GetBlockchainID(),
allowedAddress,
AllAllowedAddress,
),
NewRelayerID(
srcCfg5.GetBlockchainID(),
dstCfg1.GetBlockchainID(),
AllAllowedAddress,
allowedAddress,
),
NewRelayerID(
srcCfg6.GetBlockchainID(),
dstCfg1.GetBlockchainID(),
allowedAddress,
allowedAddress,
),
}

relayerIDs := GetConfigRelayerIDs(cfg)

// Test that all target IDs are present
for i, id := range targetIDs {
require.True(t,
func(ids []RelayerID, target RelayerID) bool {
for _, id := range ids {
if id.ID == target.ID {
return true
}
}
return false
}(relayerIDs, id),
fmt.Sprintf("target ID %d not found", i),
)
}
}
18 changes: 10 additions & 8 deletions database/json_file_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"sync"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/relayer"
"github.com/ava-labs/awm-relayer/relayer/checkpoint"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"go.uber.org/zap"
)

var _ RelayerDatabase = &JSONFileStorage{}
var _ keyValueDatabase = &JSONFileStorage{}

type chainState map[string]string

Expand All @@ -34,7 +36,7 @@ type JSONFileStorage struct {
}

// NewJSONFileStorage creates a new JSONFileStorage instance
func NewJSONFileStorage(logger logging.Logger, dir string, relayerIDs []RelayerID) (*JSONFileStorage, error) {
func NewJSONFileStorage(logger logging.Logger, dir string, relayerIDs []relayer.RelayerID) (*JSONFileStorage, error) {
storage := &JSONFileStorage{
dir: filepath.Clean(dir),
mutexes: make(map[common.Hash]*sync.RWMutex),
Expand Down Expand Up @@ -68,7 +70,7 @@ func NewJSONFileStorage(logger logging.Logger, dir string, relayerIDs []RelayerI

// 0755: The owner can read, write, execute.
// Everyone else can read and execute but not modify the file.
err = os.MkdirAll(dir, 0755)
err = os.MkdirAll(dir, 0o755)
if err != nil {
storage.logger.Error("failed to create directory",
zap.String("dir", dir),
Expand All @@ -84,7 +86,7 @@ func (s *JSONFileStorage) Get(relayerID common.Hash, dataKey DataKey) ([]byte, e
mutex, ok := s.mutexes[relayerID]
if !ok {
return nil, errors.Wrap(
ErrDatabaseMisconfiguration,
checkpoint.ErrDatabaseMisconfiguration,
fmt.Sprintf("database not configured for key %s", relayerID.String()),
)
}
Expand All @@ -96,12 +98,12 @@ func (s *JSONFileStorage) Get(relayerID common.Hash, dataKey DataKey) ([]byte, e
return nil, err
}
if !fileExists {
return nil, ErrRelayerIDNotFound
return nil, errRelayerIDNotFound
}

var val string
if val, ok = currentState[dataKey.String()]; !ok {
return nil, ErrKeyNotFound
return nil, errKeyNotFound
}

return []byte(val), nil
Expand All @@ -128,7 +130,7 @@ func (s *JSONFileStorage) Put(relayerID common.Hash, dataKey DataKey, value []by
mutex, ok := s.mutexes[relayerID]
if !ok {
return errors.Wrap(
ErrDatabaseMisconfiguration,
checkpoint.ErrDatabaseMisconfiguration,
fmt.Sprintf("database not configured for key %s", relayerID.String()),
)
}
Expand Down Expand Up @@ -159,7 +161,7 @@ func (s *JSONFileStorage) write(relayerID common.Hash, v interface{}) error {
// If the write fails, the original file is not affected.
// Set file permissions to 0644 so only the owner can read and write.
// Everyone else can only read. No one can execute the file.
if err := os.WriteFile(tmpPath, b, 0644); err != nil {
if err := os.WriteFile(tmpPath, b, 0o644); err != nil {
return errors.Wrap(err, "failed to write file")
}

Expand Down
Loading