Skip to content

Commit

Permalink
Update Streams PluginConfig checks and FeedID parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
austinborn committed Sep 19, 2024
1 parent 7b324ca commit 1fdf63a
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/thirty-emus-enjoy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Update Streams PluginConfig checks and add parsing for new Feed ID specs
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,10 @@ func (d *Delegate) newServicesMercury(
}

var telemetryType synchronization.TelemetryType
if relayConfig.EnableTriggerCapability && jb.OCR2OracleSpec.PluginConfig == nil {
if relayConfig.EnableTriggerCapability && len(jb.OCR2OracleSpec.PluginConfig) == 0 {
telemetryType = synchronization.OCR3DataFeeds
// First use case for TriggerCapability transmission is Data Feeds, so telemetry should be routed accordingly.
// This is only true if TriggerCapability is the *only* transmission method (PluginConfig == nil).
// This is only true if TriggerCapability is the *only* transmission method (PluginConfig is empty).
} else {
telemetryType = synchronization.OCR3Mercury
}
Expand Down
41 changes: 34 additions & 7 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewServices(

var err error
var pluginConfig config.PluginConfig
if jb.OCR2OracleSpec.PluginConfig == nil {
if len(jb.OCR2OracleSpec.PluginConfig) == 0 {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
Expand Down Expand Up @@ -184,6 +184,15 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

var linkFeedID utils.FeedID
if factoryCfg.reportingPluginConfig.LinkFeedID != nil {
linkFeedID = *factoryCfg.reportingPluginConfig.LinkFeedID
}
var nativeFeedID utils.FeedID
if factoryCfg.reportingPluginConfig.NativeFeedID != nil {
nativeFeedID = *factoryCfg.reportingPluginConfig.NativeFeedID
}

ds := mercuryv4.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
Expand All @@ -194,8 +203,8 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
linkFeedID,
nativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
Expand All @@ -221,6 +230,15 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

var linkFeedID utils.FeedID
if factoryCfg.reportingPluginConfig.LinkFeedID != nil {
linkFeedID = *factoryCfg.reportingPluginConfig.LinkFeedID
}
var nativeFeedID utils.FeedID
if factoryCfg.reportingPluginConfig.NativeFeedID != nil {
nativeFeedID = *factoryCfg.reportingPluginConfig.NativeFeedID
}

ds := mercuryv3.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
Expand All @@ -231,8 +249,8 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
linkFeedID,
nativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
Expand All @@ -258,6 +276,15 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

var linkFeedID utils.FeedID
if factoryCfg.reportingPluginConfig.LinkFeedID != nil {
linkFeedID = *factoryCfg.reportingPluginConfig.LinkFeedID
}
var nativeFeedID utils.FeedID
if factoryCfg.reportingPluginConfig.NativeFeedID != nil {
nativeFeedID = *factoryCfg.reportingPluginConfig.NativeFeedID
}

ds := mercuryv2.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
Expand All @@ -268,8 +295,8 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
linkFeedID,
nativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func validateOCR2MercurySpec(spec *job.OCR2OracleSpec, feedID [32]byte) error {
return pkgerrors.Wrap(err, "error while unmarshalling relay config")
}

if spec.PluginConfig == nil {
if len(spec.PluginConfig) == 0 {
if !relayConfig.EnableTriggerCapability {
return pkgerrors.Wrap(err, "at least one transmission option must be configured")
}
Expand Down
5 changes: 4 additions & 1 deletion core/services/relay/evm/mercury/utils/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ func (f *FeedID) UnmarshalText(input []byte) error {
func (f FeedID) Version() FeedVersion {
if _, exists := legacyV1FeedIDM[f]; exists {
return REPORT_V1
} else if f[0] == 0x01 { // Keystone Feed IDs
return FeedVersion(binary.BigEndian.Uint16(f[5:7]))
} else { // Current Mercury Feed IDs

Check failure on line 109 in core/services/relay/evm/mercury/utils/feeds.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary) (revive)
return FeedVersion(binary.BigEndian.Uint16(f[:2]))
}
return FeedVersion(binary.BigEndian.Uint16(f[:2]))
}

func (f FeedID) IsV1() bool { return f.Version() == REPORT_V1 }
Expand Down
27 changes: 24 additions & 3 deletions core/services/relay/evm/mercury/utils/feeds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
)

var (
v1FeedId = (FeedID)([32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v2FeedId = (FeedID)([32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v3FeedId = (FeedID)([32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})
v1FeedId = (FeedID)([32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})

Check failure on line 10 in core/services/relay/evm/mercury/utils/feeds_test.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var v1FeedId should be v1FeedID (revive)
v2FeedId = (FeedID)([32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})

Check failure on line 11 in core/services/relay/evm/mercury/utils/feeds_test.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var v2FeedId should be v2FeedID (revive)
v3FeedId = (FeedID)([32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114})

Check failure on line 12 in core/services/relay/evm/mercury/utils/feeds_test.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var v3FeedId should be v3FeedID (revive)
keystonev2Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 02, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00})
keystonev3Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 03, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00})
keystonev4Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 04, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00})
)

func Test_FeedID_Version(t *testing.T) {
Expand All @@ -28,6 +31,24 @@ func Test_FeedID_Version(t *testing.T) {
assert.False(t, v3FeedId.IsV1())
assert.False(t, v3FeedId.IsV2())
assert.True(t, v3FeedId.IsV3())

assert.Equal(t, REPORT_V2, keystonev2Feed.Version())
assert.False(t, keystonev2Feed.IsV1())
assert.True(t, keystonev2Feed.IsV2())
assert.False(t, keystonev2Feed.IsV3())
assert.False(t, keystonev2Feed.IsV4())

assert.Equal(t, REPORT_V3, keystonev3Feed.Version())
assert.False(t, keystonev3Feed.IsV1())
assert.False(t, keystonev3Feed.IsV2())
assert.True(t, keystonev3Feed.IsV3())
assert.False(t, keystonev3Feed.IsV4())

assert.Equal(t, REPORT_V4, keystonev4Feed.Version())
assert.False(t, keystonev4Feed.IsV1())
assert.False(t, keystonev4Feed.IsV2())
assert.False(t, keystonev4Feed.IsV3())
assert.True(t, keystonev4Feed.IsV4())
})
t.Run("legacy special cases", func(t *testing.T) {
for _, feedID := range legacyV1FeedIDs {
Expand Down
12 changes: 2 additions & 10 deletions core/services/relay/evm/mercury/v2/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v2

import (
"context"
"encoding/json"
"fmt"
"math/big"
"sync"
Expand All @@ -23,7 +22,6 @@ import (
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2/reportcodec"
relayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -63,12 +61,6 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs v2types.Observation, pipelineExecutionErr error) {
var wg sync.WaitGroup
var relayConfig relayTypes.RelayConfig
err := json.Unmarshal(ds.jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
pipelineExecutionErr = fmt.Errorf("failed to deserialize relay config: %w", err)
return
}
ctx, cancel := context.WithCancel(ctx)

if fetchMaxFinalizedTimestamp {
Expand Down Expand Up @@ -116,7 +108,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()

var isLink, isNative bool
if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.LinkPrice.Val = v2.MissingPrice
} else if ds.feedID == ds.linkFeedID {
isLink = true
Expand All @@ -136,7 +128,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()
}

if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.NativePrice.Val = v2.MissingPrice
} else if ds.feedID == ds.nativeFeedID {
isNative = true
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/v2/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,15 @@ func Test_Datasource(t *testing.T) {
assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price")
})

t.Run("when PluginConfig=nil skips fetching link and native prices", func(t *testing.T) {
t.Run("when PluginConfig is empty", func(t *testing.T) {
t.Cleanup(func() {
ds.jb = jb
})

fetcher.linkPriceErr = errors.New("some error fetching link price")
fetcher.nativePriceErr = errors.New("some error fetching native price")

ds.jb.OCR2OracleSpec.PluginConfig = nil
ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{}

obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
Expand Down
12 changes: 2 additions & 10 deletions core/services/relay/evm/mercury/v3/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v3

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand All @@ -23,7 +22,6 @@ import (
mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec"
relayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -65,12 +63,6 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec

func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs v3types.Observation, pipelineExecutionErr error) {
var wg sync.WaitGroup
var relayConfig relayTypes.RelayConfig
err := json.Unmarshal(ds.jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
pipelineExecutionErr = fmt.Errorf("failed to deserialize relay config: %w", err)
return
}
ctx, cancel := context.WithCancel(ctx)

if fetchMaxFinalizedTimestamp {
Expand Down Expand Up @@ -120,7 +112,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()

var isLink, isNative bool
if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.LinkPrice.Val = v3.MissingPrice
} else if ds.feedID == ds.linkFeedID {
isLink = true
Expand All @@ -140,7 +132,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()
}

if ds.jb.OCR2OracleSpec.PluginConfig == nil {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.NativePrice.Val = v3.MissingPrice
} else if ds.feedID == ds.nativeFeedID {
isNative = true
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/v3/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,15 @@ func Test_Datasource(t *testing.T) {
assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price")
})

t.Run("when PluginConfig=nil skips fetching link and native prices", func(t *testing.T) {
t.Run("when PluginConfig is empty", func(t *testing.T) {
t.Cleanup(func() {
ds.jb = jb
})

fetcher.linkPriceErr = errors.New("some error fetching link price")
fetcher.nativePriceErr = errors.New("some error fetching native price")

ds.jb.OCR2OracleSpec.PluginConfig = nil
ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{}

obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions core/services/relay/evm/mercury/v4/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()

var isLink, isNative bool
if ds.feedID == ds.linkFeedID {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.LinkPrice.Val = v4.MissingPrice
} else if ds.feedID == ds.linkFeedID {
isLink = true
} else {
wg.Add(1)
Expand All @@ -126,7 +128,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam
}()
}

if ds.feedID == ds.nativeFeedID {
if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 {
obs.NativePrice.Val = v4.MissingPrice
} else if ds.feedID == ds.nativeFeedID {
isNative = true
} else {
wg.Add(1)
Expand Down
29 changes: 29 additions & 0 deletions core/services/relay/evm/mercury/v4/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
relaymercuryv4 "github.com/smartcontractkit/chainlink-data-streams/mercury/v4"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
mercurymocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (ms *mockSaver) Save(r *pipeline.Run) {

func Test_Datasource(t *testing.T) {
orm := &mockORM{}
jb := job.Job{
Type: job.Type(pipeline.OffchainReporting2JobType),
OCR2OracleSpec: &job.OCR2OracleSpec{
CaptureEATelemetry: true,
PluginConfig: map[string]interface{}{
"serverURL": "a",
},
},
}
ds := &datasource{orm: orm, lggr: logger.TestLogger(t)}
ctx := testutils.Context(t)
repts := ocrtypes.ReportTimestamp{}
Expand Down Expand Up @@ -286,6 +296,25 @@ func Test_Datasource(t *testing.T) {
assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price")
})

t.Run("when PluginConfig is empty", func(t *testing.T) {
t.Cleanup(func() {
ds.jb = jb
})

fetcher.linkPriceErr = errors.New("some error fetching link price")
fetcher.nativePriceErr = errors.New("some error fetching native price")

ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{}

obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
assert.Nil(t, obs.LinkPrice.Err)
assert.Equal(t, obs.LinkPrice.Val, relaymercuryv4.MissingPrice)
assert.Nil(t, obs.NativePrice.Err)
assert.Equal(t, obs.NativePrice.Val, relaymercuryv4.MissingPrice)
assert.Equal(t, big.NewInt(122), obs.BenchmarkPrice.Val)
})

t.Run("when succeeds to fetch linkPrice or nativePrice but got nil (new feed)", func(t *testing.T) {
obs, err := ds.Observe(ctx, repts, false)
assert.NoError(t, err)
Expand Down

0 comments on commit 1fdf63a

Please sign in to comment.