-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WS URL can be optional when LogBroadcaster is disabled #14364
base: develop
Are you sure you want to change the base?
Changes from 21 commits
b05a1a7
71c84c6
e99aae0
6737f0c
8df1659
e1fa795
7f59b07
48c7f1a
515888c
9ecc624
01e9e08
4b860a1
fdc5c8c
2660903
b3e270d
ba73ef6
5bec775
9a841ce
670ebb8
799dc6c
427a8a0
7738ff0
795ee7b
3b27171
d3f523c
a71015d
7908e2a
ae09d35
fe19568
cc05a3f
d7a7719
3e92077
d7af044
4d83e3f
d5dbade
e8901e7
2713252
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
--- | ||
"chainlink": minor | ||
--- | ||
|
||
Make websocket URL flag `WSURL` for `EVM.Nodes`, and apply logic so that: | ||
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error | ||
* If WS URL was not provided LogBroadcaster should be disabled | ||
#internal | ||
jmank88 marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,9 @@ var ( | |
float64(8 * time.Second), | ||
}, | ||
}, []string{"evmChainID", "nodeName", "rpcHost", "isSendOnly", "success", "rpcCallName"}) | ||
errSubscribeFilterLogsNotAllowedWithoutWS = errors.New("SubscribeFilterLogs is not allowed without ws url") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we reusing these variables across multiple functions? If not, it might make sense to define them inline or make them public and use in tests |
||
errWSAndHTTPBothMissing = errors.New("cannot dial rpc client when both ws and http info are missing") | ||
errWSMissingWhenLogBroadcasterEnabled = errors.New("ws cannot be missing if LogBroadcaster is enabled") | ||
) | ||
|
||
// RPCClient includes all the necessary generalized RPC methods along with any additional chain-specific methods. | ||
|
@@ -123,6 +126,7 @@ type rpcClient struct { | |
largePayloadRpcTimeout time.Duration | ||
rpcTimeout time.Duration | ||
finalizedBlockPollInterval time.Duration | ||
logBroadcasterEnabled bool | ||
chainType chaintype.ChainType | ||
|
||
ws rawclient | ||
|
@@ -162,6 +166,7 @@ func NewRPCClient( | |
largePayloadRpcTimeout time.Duration, | ||
rpcTimeout time.Duration, | ||
chainType chaintype.ChainType, | ||
logBroadcasterEnabled bool, | ||
) RPCClient { | ||
r := &rpcClient{ | ||
largePayloadRpcTimeout: largePayloadRpcTimeout, | ||
|
@@ -174,6 +179,7 @@ func NewRPCClient( | |
r.tier = tier | ||
r.ws.uri = wsuri | ||
r.finalizedBlockPollInterval = finalizedBlockPollInterval | ||
r.logBroadcasterEnabled = logBroadcasterEnabled | ||
if httpuri != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RPC Client does not need to know about LogBroadcaster, this breaks abstraction |
||
r.http = &rawclient{uri: *httpuri} | ||
} | ||
|
@@ -195,30 +201,41 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { | |
ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout) | ||
defer cancel() | ||
|
||
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
lggr := r.rpcLog.With("wsuri", r.ws.uri.Redacted()) | ||
if r.http != nil { | ||
lggr = lggr.With("httpuri", r.http.uri.Redacted()) | ||
} | ||
lggr.Debugw("RPC dial: evmclient.Client#dial") | ||
if r.ws.uri.String() == "" { | ||
if r.http == nil { | ||
return errWSAndHTTPBothMissing | ||
} | ||
|
||
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") | ||
if err != nil { | ||
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) | ||
if r.logBroadcasterEnabled { | ||
return errWSMissingWhenLogBroadcasterEnabled | ||
} | ||
} | ||
|
||
r.ws.rpc = wsrpc | ||
r.ws.geth = ethclient.NewClient(wsrpc) | ||
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
lggr := r.rpcLog | ||
if r.ws.uri.String() != "" { | ||
lggr = lggr.With("wsuri", r.ws.uri.Redacted()) | ||
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") | ||
if err != nil { | ||
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) | ||
} | ||
|
||
r.ws.rpc = wsrpc | ||
r.ws.geth = ethclient.NewClient(wsrpc) | ||
} else { | ||
lggr = lggr.With("wsuri", "empty WS URI") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need to include |
||
} | ||
|
||
if r.http != nil { | ||
lggr = lggr.With("httpuri", r.http.uri.Redacted()) | ||
if err := r.DialHTTP(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
lggr.Debugw("RPC dial: evmclient.Client#dial") | ||
promEVMPoolRPCNodeDialsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -1215,6 +1232,9 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro | |
} | ||
|
||
func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) { | ||
if r.ws.uri.String() == "" { | ||
return nil, errSubscribeFilterLogsNotAllowedWithoutWS | ||
} | ||
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) | ||
defer cancel() | ||
lggr := r.newRqLggr().With("q", q) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,11 +57,34 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { | |
} | ||
return | ||
} | ||
t.Run("RPC client can't miss both WS and HTTP connection, if LogBroadcaster is disabled WS can be optional", func(t *testing.T) { | ||
// ws and http cannot be both missing | ||
emptyURL := url.URL{} | ||
clientRPC := client.NewRPCClient(lggr, emptyURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer clientRPC.Close() | ||
require.Error(t, clientRPC.Dial(ctx)) | ||
|
||
// ws provided, okay | ||
server := testutils.NewWSServer(t, chainId, serverCallBack) | ||
notEmptyURL := server.WSURL() | ||
clientRPC = client.NewRPCClient(lggr, *notEmptyURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
require.Nil(t, clientRPC.Dial(ctx)) | ||
|
||
// ws not provided when LogBroadcaster is enabled, error | ||
httpURL := url.URL{} | ||
clientRPC = client.NewRPCClient(lggr, emptyURL, &httpURL, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
require.Error(t, clientRPC.Dial(ctx)) | ||
|
||
// ws not provided when LogBroadcaster is disabled, ok | ||
clientRPC = client.NewRPCClient(lggr, emptyURL, &httpURL, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", false) | ||
require.Nil(t, clientRPC.Dial(ctx)) | ||
}) | ||
|
||
t.Run("Updates chain info on new blocks", func(t *testing.T) { | ||
server := testutils.NewWSServer(t, chainId, serverCallBack) | ||
wsURL := server.WSURL() | ||
|
||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer rpc.Close() | ||
require.NoError(t, rpc.Dial(ctx)) | ||
// set to default values | ||
|
@@ -111,7 +134,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { | |
server := testutils.NewWSServer(t, chainId, serverCallBack) | ||
wsURL := server.WSURL() | ||
|
||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer rpc.Close() | ||
require.NoError(t, rpc.Dial(ctx)) | ||
ch := make(chan *evmtypes.Head) | ||
|
@@ -136,7 +159,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { | |
server := testutils.NewWSServer(t, chainId, serverCallBack) | ||
wsURL := server.WSURL() | ||
|
||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer rpc.Close() | ||
require.NoError(t, rpc.Dial(ctx)) | ||
var wg sync.WaitGroup | ||
|
@@ -160,7 +183,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { | |
t.Run("Block's chain ID matched configured", func(t *testing.T) { | ||
server := testutils.NewWSServer(t, chainId, serverCallBack) | ||
wsURL := server.WSURL() | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer rpc.Close() | ||
require.NoError(t, rpc.Dial(ctx)) | ||
ch := make(chan *evmtypes.Head) | ||
|
@@ -177,7 +200,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { | |
}) | ||
wsURL := server.WSURL() | ||
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) | ||
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
require.NoError(t, rpc.Dial(ctx)) | ||
server.Close() | ||
_, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) | ||
|
@@ -187,7 +210,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { | |
t.Run("Subscription error is properly wrapper", func(t *testing.T) { | ||
server := testutils.NewWSServer(t, chainId, serverCallBack) | ||
wsURL := server.WSURL() | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer rpc.Close() | ||
require.NoError(t, rpc.Dial(ctx)) | ||
sub, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) | ||
|
@@ -209,13 +232,23 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { | |
lggr := logger.Test(t) | ||
ctx, cancel := context.WithTimeout(tests.Context(t), tests.WaitTimeout(t)) | ||
defer cancel() | ||
t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { | ||
// ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing | ||
httpURL := url.URL{} | ||
observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) | ||
rpcClient := client.NewRPCClient(observedLggr, url.URL{}, &httpURL, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", false) | ||
require.Nil(t, rpcClient.Dial(ctx)) | ||
|
||
_, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) | ||
require.Error(t, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you ensure that we return proper error msg? |
||
}) | ||
t.Run("Failed SubscribeFilterLogs logs and returns proper error", func(t *testing.T) { | ||
server := testutils.NewWSServer(t, chainId, func(reqMethod string, reqParams gjson.Result) (resp testutils.JSONRPCResponse) { | ||
return resp | ||
}) | ||
wsURL := server.WSURL() | ||
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) | ||
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
require.NoError(t, rpc.Dial(ctx)) | ||
server.Close() | ||
_, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) | ||
|
@@ -232,7 +265,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { | |
return resp | ||
}) | ||
wsURL := server.WSURL() | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
defer rpc.Close() | ||
require.NoError(t, rpc.Dial(ctx)) | ||
sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) | ||
|
@@ -281,7 +314,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { | |
} | ||
|
||
server := createRPCServer() | ||
rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||
rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "", true) | ||
require.NoError(t, rpc.Dial(ctx)) | ||
defer rpc.Close() | ||
server.Head = &evmtypes.Head{Number: 128} | ||
|
@@ -391,7 +424,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { | |
// use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout | ||
const rpcTimeout = time.Hour | ||
const largePayloadRPCTimeout = tests.TestInterval | ||
rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, largePayloadRPCTimeout, rpcTimeout, "") | ||
rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, largePayloadRPCTimeout, rpcTimeout, "", true) | ||
require.NoError(t, rpc.Dial(ctx)) | ||
defer rpc.Close() | ||
err := testCase.Fn(ctx, rpc) | ||
|
@@ -431,7 +464,7 @@ func TestAstarCustomFinality(t *testing.T) { | |
|
||
const expectedFinalizedBlockNumber = int64(4) | ||
const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804" | ||
rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) | ||
rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar, true) | ||
defer rpcClient.Close() | ||
err := rpcClient.Dial(tests.Context(t)) | ||
require.NoError(t, err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.