Skip to content

Commit

Permalink
Merge branch 'develop' into hedera_int
Browse files Browse the repository at this point in the history
  • Loading branch information
simsonraj committed Sep 20, 2024
2 parents e811ae0 + 7b324ca commit 92d0ef4
Show file tree
Hide file tree
Showing 38 changed files with 1,050 additions and 225 deletions.
5 changes: 5 additions & 0 deletions .changeset/sixty-cougars-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Use tx in insertLogsWithinTx #internal
5 changes: 5 additions & 0 deletions .changeset/sour-ears-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal KMS client for deployment
2 changes: 0 additions & 2 deletions core/capabilities/ccip/ccip_integration_tests/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ func (h *homeChain) AddNodes(
p2pIDs [][32]byte,
capabilityIDs [][32]byte,
) {
// Need to sort, otherwise _checkIsValidUniqueSubset onChain will fail
sortP2PIDS(p2pIDs)
var nodeParams []kcr.CapabilitiesRegistryNodeParams
for _, p2pID := range p2pIDs {
Expand All @@ -430,7 +429,6 @@ func AddChainConfig(
p2pIDs [][32]byte,
f uint8,
) ccip_config.CCIPConfigTypesChainConfigInfo {
// Need to sort, otherwise _checkIsValidUniqueSubset onChain will fail
sortP2PIDS(p2pIDs)
// First Add ChainConfig that includes all p2pIDs as readers
encodedExtraChainConfig, err := chainconfig.EncodeChainConfig(chainconfig.ChainConfig{
Expand Down
32 changes: 18 additions & 14 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -287,12 +288,14 @@ func TestLogPoller_Replay(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr)

head := evmtypes.Head{Number: 4}
var head atomic.Pointer[evmtypes.Head]
head.Store(&evmtypes.Head{Number: 4})

events := []common.Hash{EmitterABI.Events["Log1"].ID}
log1 := types.Log{
Index: 0,
BlockHash: common.Hash{},
BlockNumber: uint64(head.Number),
BlockNumber: uint64(head.Load().Number),
Topics: events,
Address: addr,
TxHash: common.HexToHash("0x1234"),
Expand All @@ -301,8 +304,7 @@ func TestLogPoller_Replay(t *testing.T) {

ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(context.Context, *big.Int) (*evmtypes.Head, error) {
headCopy := head
return &headCopy, nil
return head.Load(), nil
})
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
ec.On("ConfiguredChainID").Return(chainID, nil)
Expand All @@ -318,9 +320,9 @@ func TestLogPoller_Replay(t *testing.T) {
headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)

headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(func(ctx context.Context) (*evmtypes.Head, *evmtypes.Head, error) {
headCopy := head
finalized := &evmtypes.Head{Number: headCopy.Number - lpOpts.FinalityDepth}
return &headCopy, finalized, nil
h := head.Load()
finalized := &evmtypes.Head{Number: h.Number - lpOpts.FinalityDepth}
return h, finalized, nil
})
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)

Expand Down Expand Up @@ -394,7 +396,7 @@ func TestLogPoller_Replay(t *testing.T) {
var wg sync.WaitGroup
defer func() { wg.Wait() }()
ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) {
head = evmtypes.Head{Number: 4}
head.Store(&evmtypes.Head{Number: 4})
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -421,7 +423,7 @@ func TestLogPoller_Replay(t *testing.T) {

ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms

head = evmtypes.Head{Number: 5}
head.Store(&evmtypes.Head{Number: 5})
t.Cleanup(lp.reset)
servicetest.Run(t, lp)

Expand All @@ -448,7 +450,7 @@ func TestLogPoller_Replay(t *testing.T) {
go func() {
defer close(done)

head = evmtypes.Head{Number: 4} // Restore latest block to 4, so this matches the fromBlock requested
head.Store(&evmtypes.Head{Number: 4}) // Restore latest block to 4, so this matches the fromBlock requested
select {
case lp.replayStart <- 4:
case <-ctx.Done():
Expand All @@ -469,7 +471,7 @@ func TestLogPoller_Replay(t *testing.T) {
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)

t.Cleanup(lp.reset)
head = evmtypes.Head{Number: 5} // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs()
head.Store(&evmtypes.Head{Number: 5}) // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs()
servicetest.Run(t, lp)

select {
Expand All @@ -482,7 +484,8 @@ func TestLogPoller_Replay(t *testing.T) {
// ReplayAsync should return as soon as replayStart is received
t.Run("ReplayAsync success", func(t *testing.T) {
t.Cleanup(lp.reset)
head = evmtypes.Head{Number: 5}

head.Store(&evmtypes.Head{Number: 5})
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)
mockBatchCallContext(t, ec)
servicetest.Run(t, lp)
Expand All @@ -496,7 +499,7 @@ func TestLogPoller_Replay(t *testing.T) {
ctx := testutils.Context(t)
t.Cleanup(lp.reset)
servicetest.Run(t, lp)
head = evmtypes.Head{Number: 4}
head.Store(&evmtypes.Head{Number: 4})

anyErr := pkgerrors.New("async error")
observedLogs.TakeAll()
Expand Down Expand Up @@ -528,7 +531,8 @@ func TestLogPoller_Replay(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(ctx, 0)
require.NoError(t, err)

err = lp.orm.InsertBlock(ctx, head.Hash, head.Number, head.Timestamp, head.Number)
h := head.Load()
err = lp.orm.InsertBlock(ctx, h.Hash, h.Number, h.Timestamp, h.Number)
require.NoError(t, err)

ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D
(:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW())
ON CONFLICT DO NOTHING`

_, err := o.ds.NamedExecContext(ctx, query, logs[start:end])
_, err := tx.NamedExecContext(ctx, query, logs[start:end])
if err != nil {
if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
// In case of DB timeouts, try to insert again with a smaller batch upto a limit
Expand Down
112 changes: 76 additions & 36 deletions integration-tests/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext"

"github.com/smartcontractkit/chainlink/integration-tests/client"
"github.com/smartcontractkit/chainlink/integration-tests/testconfig/ocr"
"github.com/smartcontractkit/chainlink/integration-tests/types/config/node"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/link_token_interface"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/operator_factory"
Expand Down Expand Up @@ -610,29 +611,48 @@ func TrackForwarder(
Msg("Forwarder tracked")
}

// DeployOCRv2Contracts deploys a number of OCRv2 contracts and configures them with defaults
func DeployOCRv2Contracts(
// SetupOCRv2Contracts deploys a number of OCRv2 contracts and configures them with defaults
func SetupOCRv2Contracts(
l zerolog.Logger,
seth *seth.Client,
numberOfContracts int,
ocrContractsConfig ocr.OffChainAggregatorsConfig,
linkTokenAddress common.Address,
transmitters []string,
ocrOptions contracts.OffchainOptions,
) ([]contracts.OffchainAggregatorV2, error) {
var ocrInstances []contracts.OffchainAggregatorV2
for contractCount := 0; contractCount < numberOfContracts; contractCount++ {
ocrInstance, err := contracts.DeployOffchainAggregatorV2(
l,
seth,
linkTokenAddress,
ocrOptions,
)
if err != nil {
return nil, fmt.Errorf("OCRv2 instance deployment have failed: %w", err)

if ocrContractsConfig == nil {
return nil, fmt.Errorf("you need to pass non-nil OffChainAggregatorsConfig to setup OCR contracts")
}

if !ocrContractsConfig.UseExistingOffChainAggregatorsContracts() {
for contractCount := 0; contractCount < ocrContractsConfig.NumberOfContractsToDeploy(); contractCount++ {
ocrInstance, err := contracts.DeployOffchainAggregatorV2(
l,
seth,
linkTokenAddress,
ocrOptions,
)
if err != nil {
return nil, fmt.Errorf("OCRv2 instance deployment have failed: %w", err)
}
ocrInstances = append(ocrInstances, &ocrInstance)
if (contractCount+1)%ContractDeploymentInterval == 0 { // For large amounts of contract deployments, space things out some
time.Sleep(2 * time.Second)
}
}
ocrInstances = append(ocrInstances, &ocrInstance)
if (contractCount+1)%ContractDeploymentInterval == 0 { // For large amounts of contract deployments, space things out some
time.Sleep(2 * time.Second)
} else {
for _, address := range ocrContractsConfig.OffChainAggregatorsContractsAddresses() {
ocrInstance, err := contracts.LoadOffchainAggregatorV2(l, seth, address)
if err != nil {
return nil, fmt.Errorf("OCRv2 instance loading have failed: %w", err)
}
ocrInstances = append(ocrInstances, &ocrInstance)
}

if !ocrContractsConfig.ConfigureExistingOffChainAggregatorsContracts() {
return ocrInstances, nil
}
}

Expand Down Expand Up @@ -694,7 +714,7 @@ func TeardownSuite(
l.Warn().Msgf("Error deleting jobs %+v", err)
}

if chainlinkNodes != nil {
if chainlinkNodes != nil && chainClient != nil {
if err := ReturnFundsFromNodes(l, chainClient, contracts.ChainlinkK8sClientToChainlinkNodeWithKeysAndAddress(chainlinkNodes)); err != nil {
// This printed line is required for tests that use real funds to propagate the failure
// out to the system running the test. Do not remove
Expand Down Expand Up @@ -781,7 +801,7 @@ func StartNewRound(
func DeployOCRContractsForwarderFlow(
logger zerolog.Logger,
seth *seth.Client,
numberOfContracts int,
ocrContractsConfig ocr.OffChainAggregatorsConfig,
linkTokenContractAddress common.Address,
workerNodes []contracts.ChainlinkNodeWithKeysAndAddress,
forwarderAddresses []common.Address,
Expand All @@ -802,23 +822,23 @@ func DeployOCRContractsForwarderFlow(
return forwarderAddresses, nil
}

return deployAnyOCRv1Contracts(logger, seth, numberOfContracts, linkTokenContractAddress, workerNodes, transmitterPayeesFn, transmitterAddressesFn)
return setupAnyOCRv1Contracts(logger, seth, ocrContractsConfig, linkTokenContractAddress, workerNodes, transmitterPayeesFn, transmitterAddressesFn)
}

// DeployOCRv1Contracts deploys and funds a certain number of offchain aggregator contracts
func DeployOCRv1Contracts(
// SetupOCRv1Contracts deploys and funds a certain number of offchain aggregator contracts or uses existing ones and returns a slice of contract wrappers.
func SetupOCRv1Contracts(
logger zerolog.Logger,
seth *seth.Client,
numberOfContracts int,
ocrContractsConfig ocr.OffChainAggregatorsConfig,
linkTokenContractAddress common.Address,
workerNodes []contracts.ChainlinkNodeWithKeysAndAddress,
) ([]contracts.OffchainAggregator, error) {
transmitterPayeesFn := func() (transmitters []string, payees []string, err error) {
transmitters = make([]string, 0)
payees = make([]string, 0)
for _, node := range workerNodes {
for _, n := range workerNodes {
var addr string
addr, err = node.PrimaryEthAddress()
addr, err = n.PrimaryEthAddress()
if err != nil {
err = fmt.Errorf("error getting node's primary ETH address: %w", err)
return
Expand All @@ -832,8 +852,8 @@ func DeployOCRv1Contracts(

transmitterAddressesFn := func() ([]common.Address, error) {
transmitterAddresses := make([]common.Address, 0)
for _, node := range workerNodes {
primaryAddress, err := node.PrimaryEthAddress()
for _, n := range workerNodes {
primaryAddress, err := n.PrimaryEthAddress()
if err != nil {
return nil, err
}
Expand All @@ -843,28 +863,48 @@ func DeployOCRv1Contracts(
return transmitterAddresses, nil
}

return deployAnyOCRv1Contracts(logger, seth, numberOfContracts, linkTokenContractAddress, workerNodes, transmitterPayeesFn, transmitterAddressesFn)
return setupAnyOCRv1Contracts(logger, seth, ocrContractsConfig, linkTokenContractAddress, workerNodes, transmitterPayeesFn, transmitterAddressesFn)
}

func deployAnyOCRv1Contracts(
func setupAnyOCRv1Contracts(
logger zerolog.Logger,
seth *seth.Client,
numberOfContracts int,
ocrContractsConfig ocr.OffChainAggregatorsConfig,
linkTokenContractAddress common.Address,
workerNodes []contracts.ChainlinkNodeWithKeysAndAddress,
getTransmitterAndPayeesFn func() ([]string, []string, error),
getTransmitterAddressesFn func() ([]common.Address, error),
) ([]contracts.OffchainAggregator, error) {
// Deploy contracts
var ocrInstances []contracts.OffchainAggregator
for contractCount := 0; contractCount < numberOfContracts; contractCount++ {
ocrInstance, err := contracts.DeployOffchainAggregator(logger, seth, linkTokenContractAddress, contracts.DefaultOffChainAggregatorOptions())
if err != nil {
return nil, fmt.Errorf("OCR instance deployment have failed: %w", err)

if ocrContractsConfig == nil {
return nil, fmt.Errorf("you need to pass non-nil OffChainAggregatorsConfig to setup OCR contracts")
}

if !ocrContractsConfig.UseExistingOffChainAggregatorsContracts() {
// Deploy contracts
for contractCount := 0; contractCount < ocrContractsConfig.NumberOfContractsToDeploy(); contractCount++ {
ocrInstance, err := contracts.DeployOffchainAggregator(logger, seth, linkTokenContractAddress, contracts.DefaultOffChainAggregatorOptions())
if err != nil {
return nil, fmt.Errorf("OCR instance deployment have failed: %w", err)
}
ocrInstances = append(ocrInstances, &ocrInstance)
if (contractCount+1)%ContractDeploymentInterval == 0 { // For large amounts of contract deployments, space things out some
time.Sleep(2 * time.Second)
}
}
ocrInstances = append(ocrInstances, &ocrInstance)
if (contractCount+1)%ContractDeploymentInterval == 0 { // For large amounts of contract deployments, space things out some
time.Sleep(2 * time.Second)
} else {
// Load contract wrappers
for _, address := range ocrContractsConfig.OffChainAggregatorsContractsAddresses() {
ocrInstance, err := contracts.LoadOffChainAggregator(logger, seth, address)
if err != nil {
return nil, fmt.Errorf("OCR instance loading have failed: %w", err)
}
ocrInstances = append(ocrInstances, &ocrInstance)
}

if !ocrContractsConfig.ConfigureExistingOffChainAggregatorsContracts() {
return ocrInstances, nil
}
}

Expand Down
23 changes: 23 additions & 0 deletions integration-tests/actions/contracts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package actions

import (
"github.com/rs/zerolog"

"github.com/smartcontractkit/chainlink-testing-framework/seth"

"github.com/smartcontractkit/chainlink/integration-tests/contracts"
tc "github.com/smartcontractkit/chainlink/integration-tests/testconfig"
)

// LinkTokenContract returns a link token contract instance. Depending on test configuration, it either deploys a new one or uses an existing one.
func LinkTokenContract(l zerolog.Logger, sethClient *seth.Client, configWithLinkToken tc.LinkTokenContractConfig) (*contracts.EthereumLinkToken, error) {
if configWithLinkToken != nil && configWithLinkToken.UseExistingLinkTokenContract() {
linkAddress, err := configWithLinkToken.LinkTokenContractAddress()
if err != nil {
return nil, err
}

return contracts.LoadLinkTokenContract(l, sethClient, linkAddress)
}
return contracts.DeployLinkTokenContract(l, sethClient)
}
Loading

0 comments on commit 92d0ef4

Please sign in to comment.