Skip to content

Commit

Permalink
Merge pull request #408 from ava-labs/signature-aggregation-api-acp-118
Browse files Browse the repository at this point in the history
integrate ACP-118
  • Loading branch information
feuGeneA committed Sep 6, 2024
2 parents 9ed94e6 + 576a375 commit 1bd4dab
Show file tree
Hide file tree
Showing 21 changed files with 270 additions and 41 deletions.
3 changes: 3 additions & 0 deletions config/api_config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package config

import (
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/ava-labs/awm-relayer
go 1.22.6

require (
github.com/ava-labs/avalanchego v1.11.10
github.com/ava-labs/coreth v0.13.7
github.com/ava-labs/subnet-evm v0.6.8
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240813194342-7635a96aa180
github.com/ava-labs/subnet-evm v0.6.9-0.20240816202746-18633729a0cd
github.com/ava-labs/teleporter v1.0.6
github.com/aws/aws-sdk-go-v2 v1.30.5
github.com/aws/aws-sdk-go-v2/config v1.27.9
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ github.com/alexliesenfeld/health v0.8.0/go.mod h1:TfNP0f+9WQVWMQRzvMUjlws4ceXKEL
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.11.10 h1:QujciF5OEp5FwAoe/RciFF/i47rxU5rkEr6fVuUBS1Q=
github.com/ava-labs/avalanchego v1.11.10/go.mod h1:POgZPryqe80OeHCDNrXrPOKoFre736iFuMgmUBeKaLc=
github.com/ava-labs/coreth v0.13.7 h1:k8T9u/ROifl8f7oXjHRc1KvSISRl9txvy7gGVmHEz6g=
github.com/ava-labs/coreth v0.13.7/go.mod h1:tXDujonxXFOF6oK5HS2EmgtSXJK3Gy6RpZxb5WzR9rM=
github.com/ava-labs/subnet-evm v0.6.8 h1:IrHGajBYWs692YIYdd5J0oVWWt88Q/XAZQq/dOtkHFw=
github.com/ava-labs/subnet-evm v0.6.8/go.mod h1:qt8DXyGm40CY9yffNOe1+4yUyL9mD3v5RPWqAuGj5u4=
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d h1:LyrKJL9avIIxBY3uTcS2dFtUMBFmI2QpAgG6qYTdA6s=
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d/go.mod h1:UkyrRDXK2E15Lq2abyae2Pt+JsWvgsg1pe0/AtoMyAM=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240813194342-7635a96aa180 h1:6aIHp7wbyGVYdhHVQUbG7BEcbCMEQ5SYopPPJyipyvk=
github.com/ava-labs/coreth v0.13.8-fixed-genesis-upgrade.0.20240813194342-7635a96aa180/go.mod h1:/wNBVq7J7wlC2Kbov7kk6LV5xZvau7VF9zwTVOeyAjY=
github.com/ava-labs/subnet-evm v0.6.9-0.20240816202746-18633729a0cd h1:5kJTOhmIhIiobseQ+RYuLg4UyodN+CSAdW1c0hx1R2Y=
github.com/ava-labs/subnet-evm v0.6.9-0.20240816202746-18633729a0cd/go.mod h1:QfIzh7YxKj97jbendOHQbaAxM7SMj5MWdV13o1VLn70=
github.com/ava-labs/teleporter v1.0.6 h1:buZULenvJLUUMyPihiSvGMag5/rm6oF8zL8YUw7NXxE=
github.com/ava-labs/teleporter v1.0.6/go.mod h1:JRfVZzLrb4qFZz2M5/c8L7cdN4A4JWJd7GIEzVeC+sg=
github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g=
Expand Down
3 changes: 3 additions & 0 deletions peers/external_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func (h *RelayerExternalHandler) HandleInbound(_ context.Context, inboundMessage
zap.Stringer("from", inboundMessage.NodeID()),
)
if inboundMessage.Op() == message.AppResponseOp || inboundMessage.Op() == message.AppErrorOp {
if inboundMessage.Op() == message.AppErrorOp {
h.log.Debug("Received AppError message", zap.Stringer("message", inboundMessage.Message()))
}
h.registerAppResponse(inboundMessage)
} else {
h.log.Debug("Ignoring message", zap.Stringer("op", inboundMessage.Op()))
Expand Down
1 change: 1 addition & 0 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (co
if r.sourceWarpSignatureClient == nil {
signedMessage, err = r.signatureAggregator.CreateSignedMessage(
unsignedMessage,
nil,
r.signingSubnetID,
r.warpQuorum.QuorumNumerator,
)
Expand Down
4 changes: 4 additions & 0 deletions relayer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/url"
"time"

basecfg "github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/peers"
Expand Down Expand Up @@ -67,6 +68,9 @@ type Config struct {
DeciderURL string `mapstructure:"decider-url" json:"decider-url"`
SignatureCacheSize uint64 `mapstructure:"signature-cache-size" json:"signature-cache-size"`

// mapstructure doesn't handle time.Time out of the box so handle it manually
EtnaTime time.Time `json:"etna-time"`

// convenience field to fetch a blockchain's subnet ID
blockchainIDToSubnetID map[ids.ID]ids.ID
overwrittenOptions []string
Expand Down
1 change: 1 addition & 0 deletions relayer/config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ const (
ManualWarpMessagesKey = "manual-warp-messages"
DBWriteIntervalSecondsKey = "db-write-interval-seconds"
SignatureCacheSizeKey = "signature-cache-size"
EtnaTimeKey = "etna-time"
)
3 changes: 3 additions & 0 deletions relayer/config/viper.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func BuildConfig(v *viper.Viper) (Config, error) {
return cfg, fmt.Errorf("failed to unmarshal viper config: %w", err)
}

// Manually set EtnaTime field since it's not automatically parseable using mapstructure
cfg.EtnaTime = v.GetTime(EtnaTimeKey)

// Explicitly overwrite the configured account private key
// If account-private-key is set as a flag or environment variable,
// overwrite all destination subnet configurations to use that key
Expand Down
1 change: 1 addition & 0 deletions relayer/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func main() {
prometheus.DefaultRegisterer,
),
messageCreator,
cfg.EtnaTime,
)
if err != nil {
logger.Fatal("Failed to create signature aggregator", zap.Error(err))
Expand Down
6 changes: 4 additions & 2 deletions scripts/versions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ function getDepVersion() {
export GO_VERSION=${GO_VERSION:-$(getDepVersion go)}

# Don't export them as they're used in the context of other calls
AVALANCHEGO_VERSION=${AVALANCHEGO_VERSION:-$(getDepVersion github.com/ava-labs/avalanchego)}
# TODO: undo this hack once go.mod is referring to a tag rather than a commit
#AVALANCHEGO_VERSION=${AVALANCHEGO_VERSION:-$(getDepVersion github.com/ava-labs/avalanchego)}
AVALANCHEGO_VERSION=${AVALANCHEGO_VERSION:-ab83fb41528de93c1790301cdd67a07dda9299f0}
GINKGO_VERSION=${GINKGO_VERSION:-$(getDepVersion github.com/onsi/ginkgo/v2)}

# TODO: undo this hack once go.mod is referring to a tag rather than a commit
#SUBNET_EVM_VERSION=${SUBNET_EVM_VERSION:-$(getDepVersion github.com/ava-labs/subnet-evm)}
SUBNET_EVM_VERSION=${SUBNET_EVM_VERSION:-update-avago-teleporter}
SUBNET_EVM_VERSION=${SUBNET_EVM_VERSION:-18633729a0cde7d695616e14b77873957a2b59c2}

# Set golangci-lint version
GOLANGCI_LINT_VERSION=${GOLANGCI_LINT_VERSION:-'v1.60'}
Expand Down
4 changes: 2 additions & 2 deletions signature-aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ curl --location 'https://api.avax-test.network/ext/bc/C/rpc' \
The topic of the message will be `0x56600c567728a800c0aa927500f831cb451df66a7af570eb4df4dfbf4674887d` which is the output of`cast keccak "SendWarpMessage(address,bytes32,bytes)"`
4. Use the data field of the log message found in step 2 and send it to the locally running service via curl.
```bash
curl --location 'http://localhost:8080/aggregate-signatures/by-raw-message' \
curl --location 'http://localhost:8080/aggregate-signatures' \
--header 'Content-Type: application/json' \
--data '{
"data": "<hex encoded unsigned message bytes retrieved from the logs>",
"message": "<hex encoded unsigned message bytes retrieved from the logs>"
}'
```
103 changes: 84 additions & 19 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
networkP2P "github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
Expand All @@ -26,9 +28,10 @@ import (
"github.com/ava-labs/awm-relayer/signature-aggregator/aggregator/cache"
"github.com/ava-labs/awm-relayer/signature-aggregator/metrics"
"github.com/ava-labs/awm-relayer/utils"
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
corethMsg "github.com/ava-labs/coreth/plugin/evm/message"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type blsSignatureBuf [bls.SignatureLen]byte
Expand All @@ -42,8 +45,8 @@ const (
)

var (
codec = msg.Codec
coreEthCodec = coreEthMsg.Codec
codec = msg.Codec
corethCodec = corethMsg.Codec

// Errors
errNotEnoughSignatures = errors.New("failed to collect a threshold of signatures")
Expand All @@ -60,6 +63,7 @@ type SignatureAggregator struct {
subnetsMapLock sync.RWMutex
metrics *metrics.SignatureAggregatorMetrics
cache *cache.Cache
etnaTime time.Time
}

func NewSignatureAggregator(
Expand All @@ -68,6 +72,7 @@ func NewSignatureAggregator(
signatureCacheSize uint64,
metrics *metrics.SignatureAggregatorMetrics,
messageCreator message.Creator,
etnaTime time.Time,
) (*SignatureAggregator, error) {
cache, err := cache.NewCache(signatureCacheSize, logger)
if err != nil {
Expand All @@ -84,13 +89,15 @@ func NewSignatureAggregator(
messageCreator: messageCreator,
currentRequestID: atomic.Uint32{},
cache: cache,
etnaTime: etnaTime,
}
sa.currentRequestID.Store(rand.Uint32())
return &sa, nil
}

func (s *SignatureAggregator) CreateSignedMessage(
unsignedMessage *avalancheWarp.UnsignedMessage,
justification []byte,
inputSigningSubnet ids.ID,
quorumPercentage uint64,
) (*avalancheWarp.Message, error) {
Expand Down Expand Up @@ -176,19 +183,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
))
}

// TODO: remove this special handling and replace with ACP-118 interface once available
var reqBytes []byte
if sourceSubnet == constants.PrimaryNetworkID {
req := coreEthMsg.MessageSignatureRequest{
MessageID: unsignedMessage.ID(),
}
reqBytes, err = coreEthMsg.RequestToBytes(coreEthCodec, req)
} else {
req := msg.MessageSignatureRequest{
MessageID: unsignedMessage.ID(),
}
reqBytes, err = msg.RequestToBytes(codec, req)
}
reqBytes, err := s.marshalRequest(unsignedMessage, justification, sourceSubnet)
if err != nil {
msg := "Failed to marshal request bytes"
s.logger.Error(
Expand Down Expand Up @@ -524,14 +519,13 @@ func (s *SignatureAggregator) isValidSignatureResponse(
return blsSignatureBuf{}, false
}

var sigResponse msg.SignatureResponse
if _, err := msg.Codec.Unmarshal(appResponse.AppBytes, &sigResponse); err != nil {
signature, err := s.unmarshalResponse(appResponse.AppBytes)
if err != nil {
s.logger.Error(
"Error unmarshaling signature response",
zap.Error(err),
)
}
signature := sigResponse.Signature

// If the node returned an empty signature, then it has not yet seen the warp message. Retry later.
emptySignature := blsSignatureBuf{}
Expand All @@ -543,6 +537,15 @@ func (s *SignatureAggregator) isValidSignatureResponse(
return blsSignatureBuf{}, false
}

if len(signature) != bls.SignatureLen {
s.logger.Debug(
"Response signature has incorrect length",
zap.Int("actual", len(signature)),
zap.Int("expected", bls.SignatureLen),
)
return blsSignatureBuf{}, false
}

sig, err := bls.SignatureFromBytes(signature[:])
if err != nil {
s.logger.Debug(
Expand Down Expand Up @@ -590,3 +593,65 @@ func (s *SignatureAggregator) aggregateSignatures(
}
return aggSig, vdrBitSet, nil
}

// TODO: refactor this to remove special handling based on etnaTime
// after Etna release, along with related config and testing code
func (s *SignatureAggregator) marshalRequest(
unsignedMessage *avalancheWarp.UnsignedMessage,
justification []byte,
sourceSubnet ids.ID,
) ([]byte, error) {
if s.etnaActivated() {
// Post-Etna case
messageBytes, err := proto.Marshal(
&sdk.SignatureRequest{
Message: unsignedMessage.Bytes(),
Justification: justification,
},
)
if err != nil {
return nil, err
}
return networkP2P.PrefixMessage(
networkP2P.ProtocolPrefix(networkP2P.SignatureRequestHandlerID),
messageBytes,
), nil
} else {
// Pre-Etna case
if sourceSubnet == constants.PrimaryNetworkID {
req := corethMsg.MessageSignatureRequest{
MessageID: unsignedMessage.ID(),
}
return corethMsg.RequestToBytes(corethCodec, req)
} else {
req := msg.MessageSignatureRequest{
MessageID: unsignedMessage.ID(),
}
return msg.RequestToBytes(codec, req)
}
}
}

func (s *SignatureAggregator) unmarshalResponse(responseBytes []byte) (blsSignatureBuf, error) {
if s.etnaActivated() {
// Post-Etna case
var sigResponse sdk.SignatureResponse
err := proto.Unmarshal(responseBytes, &sigResponse)
if err != nil {
return blsSignatureBuf{}, err
}
return blsSignatureBuf(sigResponse.Signature), nil
} else {
// Pre-Etna case
var sigResponse msg.SignatureResponse
_, err := msg.Codec.Unmarshal(responseBytes, &sigResponse)
if err != nil {
return blsSignatureBuf{}, err
}
return sigResponse.Signature, nil
}
}

func (s *SignatureAggregator) etnaActivated() bool {
return !s.etnaTime.IsZero() && s.etnaTime.Before(time.Now())
}
7 changes: 5 additions & 2 deletions signature-aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregator

import (
"testing"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
Expand Down Expand Up @@ -43,6 +44,8 @@ func instantiateAggregator(t *testing.T) (
1024,
sigAggMetrics,
messageCreator,
// Setting the etnaTime to a minute ago so that the post-etna code path is used in the test
time.Now().Add(-1*time.Minute),
)
require.Equal(t, err, nil)
return aggregator, mockNetwork
Expand All @@ -61,7 +64,7 @@ func TestCreateSignedMessageFailsWithNoValidators(t *testing.T) {
},
nil,
)
_, err = aggregator.CreateSignedMessage(msg, ids.Empty, 80)
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80)
require.ErrorContains(t, err, "no signatures")
}

Expand All @@ -78,7 +81,7 @@ func TestCreateSignedMessageFailsWithoutSufficientConnectedStake(t *testing.T) {
},
nil,
)
_, err = aggregator.CreateSignedMessage(msg, ids.Empty, 80)
_, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80)
require.ErrorContains(
t,
err,
Expand Down
Loading

0 comments on commit 1bd4dab

Please sign in to comment.