diff --git a/.changeset/thirty-emus-enjoy.md b/.changeset/thirty-emus-enjoy.md new file mode 100644 index 00000000000..3edd62de680 --- /dev/null +++ b/.changeset/thirty-emus-enjoy.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Update Streams PluginConfig checks and add parsing for new Feed ID specs diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 9d4dbb85982..b51765f06f7 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -889,10 +889,10 @@ func (d *Delegate) newServicesMercury( } var telemetryType synchronization.TelemetryType - if relayConfig.EnableTriggerCapability && jb.OCR2OracleSpec.PluginConfig == nil { + if relayConfig.EnableTriggerCapability && len(jb.OCR2OracleSpec.PluginConfig) == 0 { telemetryType = synchronization.OCR3DataFeeds // First use case for TriggerCapability transmission is Data Feeds, so telemetry should be routed accordingly. - // This is only true if TriggerCapability is the *only* transmission method (PluginConfig == nil). + // This is only true if TriggerCapability is the *only* transmission method (PluginConfig is empty). } else { telemetryType = synchronization.OCR3Mercury } diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index f78531d6b07..dc93ed337eb 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -81,7 +81,7 @@ func NewServices( var err error var pluginConfig config.PluginConfig - if jb.OCR2OracleSpec.PluginConfig == nil { + if len(jb.OCR2OracleSpec.PluginConfig) == 0 { if !enableTriggerCapability { return nil, fmt.Errorf("at least one transmission option must be configured") } @@ -184,6 +184,15 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. var factory ocr3types.MercuryPluginFactory srvs := make([]job.ServiceCtx, 0) + var linkFeedID utils.FeedID + if factoryCfg.reportingPluginConfig.LinkFeedID != nil { + linkFeedID = *factoryCfg.reportingPluginConfig.LinkFeedID + } + var nativeFeedID utils.FeedID + if factoryCfg.reportingPluginConfig.NativeFeedID != nil { + nativeFeedID = *factoryCfg.reportingPluginConfig.NativeFeedID + } + ds := mercuryv4.NewDataSource( factoryCfg.orm, factoryCfg.pipelineRunner, @@ -194,8 +203,8 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. factoryCfg.saver, factoryCfg.chEnhancedTelem, factoryCfg.ocr2Provider.MercuryServerFetcher(), - *factoryCfg.reportingPluginConfig.LinkFeedID, - *factoryCfg.reportingPluginConfig.NativeFeedID, + linkFeedID, + nativeFeedID, ) loopCmd := env.MercuryPlugin.Cmd.Get() @@ -221,6 +230,15 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. var factory ocr3types.MercuryPluginFactory srvs := make([]job.ServiceCtx, 0) + var linkFeedID utils.FeedID + if factoryCfg.reportingPluginConfig.LinkFeedID != nil { + linkFeedID = *factoryCfg.reportingPluginConfig.LinkFeedID + } + var nativeFeedID utils.FeedID + if factoryCfg.reportingPluginConfig.NativeFeedID != nil { + nativeFeedID = *factoryCfg.reportingPluginConfig.NativeFeedID + } + ds := mercuryv3.NewDataSource( factoryCfg.orm, factoryCfg.pipelineRunner, @@ -231,8 +249,8 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. factoryCfg.saver, factoryCfg.chEnhancedTelem, factoryCfg.ocr2Provider.MercuryServerFetcher(), - *factoryCfg.reportingPluginConfig.LinkFeedID, - *factoryCfg.reportingPluginConfig.NativeFeedID, + linkFeedID, + nativeFeedID, ) loopCmd := env.MercuryPlugin.Cmd.Get() @@ -258,6 +276,15 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. var factory ocr3types.MercuryPluginFactory srvs := make([]job.ServiceCtx, 0) + var linkFeedID utils.FeedID + if factoryCfg.reportingPluginConfig.LinkFeedID != nil { + linkFeedID = *factoryCfg.reportingPluginConfig.LinkFeedID + } + var nativeFeedID utils.FeedID + if factoryCfg.reportingPluginConfig.NativeFeedID != nil { + nativeFeedID = *factoryCfg.reportingPluginConfig.NativeFeedID + } + ds := mercuryv2.NewDataSource( factoryCfg.orm, factoryCfg.pipelineRunner, @@ -268,8 +295,8 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. factoryCfg.saver, factoryCfg.chEnhancedTelem, factoryCfg.ocr2Provider.MercuryServerFetcher(), - *factoryCfg.reportingPluginConfig.LinkFeedID, - *factoryCfg.reportingPluginConfig.NativeFeedID, + linkFeedID, + nativeFeedID, ) loopCmd := env.MercuryPlugin.Cmd.Get() diff --git a/core/services/ocr2/validate/validate.go b/core/services/ocr2/validate/validate.go index 09b974d4e30..7ea34e5ac2d 100644 --- a/core/services/ocr2/validate/validate.go +++ b/core/services/ocr2/validate/validate.go @@ -304,7 +304,7 @@ func validateOCR2MercurySpec(spec *job.OCR2OracleSpec, feedID [32]byte) error { return pkgerrors.Wrap(err, "error while unmarshalling relay config") } - if spec.PluginConfig == nil { + if len(spec.PluginConfig) == 0 { if !relayConfig.EnableTriggerCapability { return pkgerrors.Wrap(err, "at least one transmission option must be configured") } diff --git a/core/services/relay/evm/mercury/utils/feeds.go b/core/services/relay/evm/mercury/utils/feeds.go index 36d6bc60f58..fe223bd1e75 100644 --- a/core/services/relay/evm/mercury/utils/feeds.go +++ b/core/services/relay/evm/mercury/utils/feeds.go @@ -104,8 +104,11 @@ func (f *FeedID) UnmarshalText(input []byte) error { func (f FeedID) Version() FeedVersion { if _, exists := legacyV1FeedIDM[f]; exists { return REPORT_V1 + } else if f[0] == 0x01 { // Keystone Feed IDs + return FeedVersion(binary.BigEndian.Uint16(f[5:7])) + } else { // Current Mercury Feed IDs + return FeedVersion(binary.BigEndian.Uint16(f[:2])) } - return FeedVersion(binary.BigEndian.Uint16(f[:2])) } func (f FeedID) IsV1() bool { return f.Version() == REPORT_V1 } diff --git a/core/services/relay/evm/mercury/utils/feeds_test.go b/core/services/relay/evm/mercury/utils/feeds_test.go index 37b9b47de76..1f39d0af6bf 100644 --- a/core/services/relay/evm/mercury/utils/feeds_test.go +++ b/core/services/relay/evm/mercury/utils/feeds_test.go @@ -7,9 +7,12 @@ import ( ) var ( - v1FeedId = (FeedID)([32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}) - v2FeedId = (FeedID)([32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}) - v3FeedId = (FeedID)([32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}) + v1FeedId = (FeedID)([32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}) + v2FeedId = (FeedID)([32]uint8{00, 02, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}) + v3FeedId = (FeedID)([32]uint8{00, 03, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}) + keystonev2Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 02, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00}) + keystonev3Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 03, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00}) + keystonev4Feed = (FeedID)([32]uint8{01, 12, 34, 56, 78, 00, 04, 04, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00, 00}) ) func Test_FeedID_Version(t *testing.T) { @@ -28,6 +31,24 @@ func Test_FeedID_Version(t *testing.T) { assert.False(t, v3FeedId.IsV1()) assert.False(t, v3FeedId.IsV2()) assert.True(t, v3FeedId.IsV3()) + + assert.Equal(t, REPORT_V2, keystonev2Feed.Version()) + assert.False(t, keystonev2Feed.IsV1()) + assert.True(t, keystonev2Feed.IsV2()) + assert.False(t, keystonev2Feed.IsV3()) + assert.False(t, keystonev2Feed.IsV4()) + + assert.Equal(t, REPORT_V3, keystonev3Feed.Version()) + assert.False(t, keystonev3Feed.IsV1()) + assert.False(t, keystonev3Feed.IsV2()) + assert.True(t, keystonev3Feed.IsV3()) + assert.False(t, keystonev3Feed.IsV4()) + + assert.Equal(t, REPORT_V4, keystonev4Feed.Version()) + assert.False(t, keystonev4Feed.IsV1()) + assert.False(t, keystonev4Feed.IsV2()) + assert.False(t, keystonev4Feed.IsV3()) + assert.True(t, keystonev4Feed.IsV4()) }) t.Run("legacy special cases", func(t *testing.T) { for _, feedID := range legacyV1FeedIDs { diff --git a/core/services/relay/evm/mercury/v2/data_source.go b/core/services/relay/evm/mercury/v2/data_source.go index d05bd00e25a..fed748ac937 100644 --- a/core/services/relay/evm/mercury/v2/data_source.go +++ b/core/services/relay/evm/mercury/v2/data_source.go @@ -2,7 +2,6 @@ package v2 import ( "context" - "encoding/json" "fmt" "math/big" "sync" @@ -23,7 +22,6 @@ import ( mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2/reportcodec" - relayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -63,12 +61,6 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs v2types.Observation, pipelineExecutionErr error) { var wg sync.WaitGroup - var relayConfig relayTypes.RelayConfig - err := json.Unmarshal(ds.jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig) - if err != nil { - pipelineExecutionErr = fmt.Errorf("failed to deserialize relay config: %w", err) - return - } ctx, cancel := context.WithCancel(ctx) if fetchMaxFinalizedTimestamp { @@ -116,7 +108,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() var isLink, isNative bool - if ds.jb.OCR2OracleSpec.PluginConfig == nil { + if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 { obs.LinkPrice.Val = v2.MissingPrice } else if ds.feedID == ds.linkFeedID { isLink = true @@ -136,7 +128,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() } - if ds.jb.OCR2OracleSpec.PluginConfig == nil { + if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 { obs.NativePrice.Val = v2.MissingPrice } else if ds.feedID == ds.nativeFeedID { isNative = true diff --git a/core/services/relay/evm/mercury/v2/data_source_test.go b/core/services/relay/evm/mercury/v2/data_source_test.go index 7392ceb9866..25716521d86 100644 --- a/core/services/relay/evm/mercury/v2/data_source_test.go +++ b/core/services/relay/evm/mercury/v2/data_source_test.go @@ -284,7 +284,7 @@ func Test_Datasource(t *testing.T) { assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price") }) - t.Run("when PluginConfig=nil skips fetching link and native prices", func(t *testing.T) { + t.Run("when PluginConfig is empty", func(t *testing.T) { t.Cleanup(func() { ds.jb = jb }) @@ -292,7 +292,7 @@ func Test_Datasource(t *testing.T) { fetcher.linkPriceErr = errors.New("some error fetching link price") fetcher.nativePriceErr = errors.New("some error fetching native price") - ds.jb.OCR2OracleSpec.PluginConfig = nil + ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{} obs, err := ds.Observe(ctx, repts, false) assert.NoError(t, err) diff --git a/core/services/relay/evm/mercury/v3/data_source.go b/core/services/relay/evm/mercury/v3/data_source.go index 48db946517f..9744ec45d80 100644 --- a/core/services/relay/evm/mercury/v3/data_source.go +++ b/core/services/relay/evm/mercury/v3/data_source.go @@ -2,7 +2,6 @@ package v3 import ( "context" - "encoding/json" "errors" "fmt" "math/big" @@ -23,7 +22,6 @@ import ( mercurytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" - relayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -65,12 +63,6 @@ func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedTimestamp bool) (obs v3types.Observation, pipelineExecutionErr error) { var wg sync.WaitGroup - var relayConfig relayTypes.RelayConfig - err := json.Unmarshal(ds.jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig) - if err != nil { - pipelineExecutionErr = fmt.Errorf("failed to deserialize relay config: %w", err) - return - } ctx, cancel := context.WithCancel(ctx) if fetchMaxFinalizedTimestamp { @@ -120,7 +112,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() var isLink, isNative bool - if ds.jb.OCR2OracleSpec.PluginConfig == nil { + if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 { obs.LinkPrice.Val = v3.MissingPrice } else if ds.feedID == ds.linkFeedID { isLink = true @@ -140,7 +132,7 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() } - if ds.jb.OCR2OracleSpec.PluginConfig == nil { + if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 { obs.NativePrice.Val = v3.MissingPrice } else if ds.feedID == ds.nativeFeedID { isNative = true diff --git a/core/services/relay/evm/mercury/v3/data_source_test.go b/core/services/relay/evm/mercury/v3/data_source_test.go index 3d01d7472e5..518fabb12c9 100644 --- a/core/services/relay/evm/mercury/v3/data_source_test.go +++ b/core/services/relay/evm/mercury/v3/data_source_test.go @@ -364,7 +364,7 @@ func Test_Datasource(t *testing.T) { assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price") }) - t.Run("when PluginConfig=nil skips fetching link and native prices", func(t *testing.T) { + t.Run("when PluginConfig is empty", func(t *testing.T) { t.Cleanup(func() { ds.jb = jb }) @@ -372,7 +372,7 @@ func Test_Datasource(t *testing.T) { fetcher.linkPriceErr = errors.New("some error fetching link price") fetcher.nativePriceErr = errors.New("some error fetching native price") - ds.jb.OCR2OracleSpec.PluginConfig = nil + ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{} obs, err := ds.Observe(ctx, repts, false) assert.NoError(t, err) diff --git a/core/services/relay/evm/mercury/v4/data_source.go b/core/services/relay/evm/mercury/v4/data_source.go index 05ec44dc78c..bf45ccc87fd 100644 --- a/core/services/relay/evm/mercury/v4/data_source.go +++ b/core/services/relay/evm/mercury/v4/data_source.go @@ -108,7 +108,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() var isLink, isNative bool - if ds.feedID == ds.linkFeedID { + if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 { + obs.LinkPrice.Val = v4.MissingPrice + } else if ds.feedID == ds.linkFeedID { isLink = true } else { wg.Add(1) @@ -126,7 +128,9 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam }() } - if ds.feedID == ds.nativeFeedID { + if len(ds.jb.OCR2OracleSpec.PluginConfig) == 0 { + obs.NativePrice.Val = v4.MissingPrice + } else if ds.feedID == ds.nativeFeedID { isNative = true } else { wg.Add(1) diff --git a/core/services/relay/evm/mercury/v4/data_source_test.go b/core/services/relay/evm/mercury/v4/data_source_test.go index 7b50a922c8d..ba6148903ec 100644 --- a/core/services/relay/evm/mercury/v4/data_source_test.go +++ b/core/services/relay/evm/mercury/v4/data_source_test.go @@ -13,6 +13,7 @@ import ( relaymercuryv4 "github.com/smartcontractkit/chainlink-data-streams/mercury/v4" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" mercurymocks "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" @@ -70,6 +71,15 @@ func (ms *mockSaver) Save(r *pipeline.Run) { func Test_Datasource(t *testing.T) { orm := &mockORM{} + jb := job.Job{ + Type: job.Type(pipeline.OffchainReporting2JobType), + OCR2OracleSpec: &job.OCR2OracleSpec{ + CaptureEATelemetry: true, + PluginConfig: map[string]interface{}{ + "serverURL": "a", + }, + }, + } ds := &datasource{orm: orm, lggr: logger.TestLogger(t)} ctx := testutils.Context(t) repts := ocrtypes.ReportTimestamp{} @@ -286,6 +296,25 @@ func Test_Datasource(t *testing.T) { assert.EqualError(t, obs.NativePrice.Err, "some error fetching native price") }) + t.Run("when PluginConfig is empty", func(t *testing.T) { + t.Cleanup(func() { + ds.jb = jb + }) + + fetcher.linkPriceErr = errors.New("some error fetching link price") + fetcher.nativePriceErr = errors.New("some error fetching native price") + + ds.jb.OCR2OracleSpec.PluginConfig = job.JSONConfig{} + + obs, err := ds.Observe(ctx, repts, false) + assert.NoError(t, err) + assert.Nil(t, obs.LinkPrice.Err) + assert.Equal(t, obs.LinkPrice.Val, relaymercuryv4.MissingPrice) + assert.Nil(t, obs.NativePrice.Err) + assert.Equal(t, obs.NativePrice.Val, relaymercuryv4.MissingPrice) + assert.Equal(t, big.NewInt(122), obs.BenchmarkPrice.Val) + }) + t.Run("when succeeds to fetch linkPrice or nativePrice but got nil (new feed)", func(t *testing.T) { obs, err := ds.Observe(ctx, repts, false) assert.NoError(t, err)