-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WS URL can be optional when LogBroadcaster is disabled #14364
base: develop
Are you sure you want to change the base?
Changes from 28 commits
b05a1a7
71c84c6
e99aae0
6737f0c
8df1659
e1fa795
7f59b07
48c7f1a
515888c
9ecc624
01e9e08
4b860a1
fdc5c8c
2660903
b3e270d
ba73ef6
5bec775
9a841ce
670ebb8
799dc6c
427a8a0
7738ff0
795ee7b
3b27171
d3f523c
a71015d
7908e2a
ae09d35
fe19568
cc05a3f
d7a7719
3e92077
d7af044
4d83e3f
d5dbade
e8901e7
2713252
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
--- | ||
"chainlink": minor | ||
--- | ||
|
||
Make websocket URL flag `WSURL` for `EVM.Nodes`, and apply logic so that: | ||
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error | ||
* If WS URL was not provided LogBroadcaster should be disabled | ||
#internal | ||
jmank88 marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -79,15 +79,21 @@ func NewClientConfigs( | |||||
func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) { | ||||||
nodes := make([]*toml.Node, len(nodeCfgs)) | ||||||
for i, nodeCfg := range nodeCfgs { | ||||||
if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil { | ||||||
return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i) | ||||||
var wsURL, httpURL *commonconfig.URL | ||||||
// wsUrl is optional, if LogBroadcaster is enabled, at least one wsURL is required and this will be checked in EVMConfig validation | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
if nodeCfg.WSURL != nil { | ||||||
wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL) | ||||||
} | ||||||
wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL) | ||||||
httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL) | ||||||
|
||||||
if nodeCfg.HTTPURL == nil { | ||||||
return nil, fmt.Errorf("node config [%d]: HTTP URL", i) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL) | ||||||
node := &toml.Node{ | ||||||
Name: nodeCfg.Name, | ||||||
WSURL: wsUrl, | ||||||
HTTPURL: httpUrl, | ||||||
WSURL: wsURL, | ||||||
HTTPURL: httpURL, | ||||||
SendOnly: nodeCfg.SendOnly, | ||||||
Order: nodeCfg.Order, | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,8 @@ var ( | |
float64(8 * time.Second), | ||
}, | ||
}, []string{"evmChainID", "nodeName", "rpcHost", "isSendOnly", "success", "rpcCallName"}) | ||
errSubscribeFilterLogsNotAllowedWithoutWS = errors.New("SubscribeFilterLogs is not allowed without ws url") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we reusing these variables across multiple functions? If not, it might make sense to define them inline or make them public and use in tests |
||
errWSAndHTTPBothMissing = errors.New("cannot dial rpc client when both ws and http info are missing") | ||
) | ||
|
||
// RPCClient includes all the necessary generalized RPC methods along with any additional chain-specific methods. | ||
|
@@ -195,30 +197,35 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { | |
ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout) | ||
defer cancel() | ||
|
||
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
lggr := r.rpcLog.With("wsuri", r.ws.uri.Redacted()) | ||
if r.http != nil { | ||
lggr = lggr.With("httpuri", r.http.uri.Redacted()) | ||
if r.ws.uri.String() == "" && r.http == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WS is optional. Can't we make it a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can, but it requires a lot refactor and adding nil ptr check and not sure if we want to do in this PR |
||
return errWSAndHTTPBothMissing | ||
} | ||
lggr.Debugw("RPC dial: evmclient.Client#dial") | ||
|
||
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") | ||
if err != nil { | ||
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) | ||
} | ||
promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
lggr := r.rpcLog | ||
if r.ws.uri.String() != "" { | ||
lggr = lggr.With("wsuri", r.ws.uri.Redacted()) | ||
wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") | ||
if err != nil { | ||
promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) | ||
} | ||
|
||
r.ws.rpc = wsrpc | ||
r.ws.geth = ethclient.NewClient(wsrpc) | ||
r.ws.rpc = wsrpc | ||
r.ws.geth = ethclient.NewClient(wsrpc) | ||
} else { | ||
lggr = lggr.With("wsuri", "empty WS URI") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need to include |
||
} | ||
|
||
if r.http != nil { | ||
lggr = lggr.With("httpuri", r.http.uri.Redacted()) | ||
if err := r.DialHTTP(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
lggr.Debugw("RPC dial: evmclient.Client#dial") | ||
promEVMPoolRPCNodeDialsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc() | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -1215,6 +1222,9 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro | |
} | ||
|
||
func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) { | ||
if r.ws.uri.String() == "" { | ||
return nil, errSubscribeFilterLogsNotAllowedWithoutWS | ||
} | ||
ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) | ||
defer cancel() | ||
lggr := r.newRqLggr().With("q", q) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -209,6 +209,16 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { | |||||||||||
lggr := logger.Test(t) | ||||||||||||
ctx, cancel := context.WithTimeout(tests.Context(t), tests.WaitTimeout(t)) | ||||||||||||
defer cancel() | ||||||||||||
t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { | ||||||||||||
// ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing | ||||||||||||
httpURL := url.URL{} | ||||||||||||
observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) | ||||||||||||
rpcClient := client.NewRPCClient(observedLggr, url.URL{}, &httpURL, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this specific test it can't, since Dial() will fail due to |
||||||||||||
require.Nil(t, rpcClient.Dial(ctx)) | ||||||||||||
|
||||||||||||
_, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) | ||||||||||||
require.Error(t, err) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you ensure that we return proper error msg? |
||||||||||||
}) | ||||||||||||
t.Run("Failed SubscribeFilterLogs logs and returns proper error", func(t *testing.T) { | ||||||||||||
server := testutils.NewWSServer(t, chainId, func(reqMethod string, reqParams gjson.Result) (resp testutils.JSONRPCResponse) { | ||||||||||||
return resp | ||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,7 +23,10 @@ import ( | |||||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" | ||||||
) | ||||||
|
||||||
var ErrNotFound = errors.New("not found") | ||||||
var ( | ||||||
ErrNotFound = errors.New("not found") | ||||||
ErrLogBroadcasterEnabledWithoutWSURL = errors.New("logbroadcaster cannot be enabled unless all nodes have WSURL provided") | ||||||
) | ||||||
|
||||||
type HasEVMConfigs interface { | ||||||
EVMConfigs() EVMConfigs | ||||||
|
@@ -310,6 +313,12 @@ func (c *EVMConfig) ValidateConfig() (err error) { | |||||
if len(c.Nodes) == 0 { | ||||||
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: "must have at least one node"}) | ||||||
} else { | ||||||
if c.LogBroadcasterEnabled != nil { | ||||||
if err = verifyLogBroadcasterFlag(c.Nodes, *c.LogBroadcasterEnabled); err != nil { | ||||||
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: "must have at least one ws uri when LogBroadcaster is enabled"}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
var hasPrimary bool | ||||||
for _, n := range c.Nodes { | ||||||
if n.SendOnly != nil && *n.SendOnly { | ||||||
|
@@ -965,19 +974,8 @@ func (n *Node) ValidateConfig() (err error) { | |||||
err = multierr.Append(err, commonconfig.ErrEmpty{Name: "Name", Msg: "required for all nodes"}) | ||||||
} | ||||||
|
||||||
var sendOnly bool | ||||||
if n.SendOnly != nil { | ||||||
sendOnly = *n.SendOnly | ||||||
} | ||||||
if n.WSURL == nil { | ||||||
if !sendOnly { | ||||||
err = multierr.Append(err, commonconfig.ErrMissing{Name: "WSURL", Msg: "required for primary nodes"}) | ||||||
} | ||||||
} else if n.WSURL.IsZero() { | ||||||
if !sendOnly { | ||||||
err = multierr.Append(err, commonconfig.ErrEmpty{Name: "WSURL", Msg: "required for primary nodes"}) | ||||||
} | ||||||
} else { | ||||||
// relax the check here as WSURL can potentially be empty if LogBroadcaster is disabled (checked in EVMConfig Validation) | ||||||
if n.WSURL != nil && !n.WSURL.IsZero() { | ||||||
switch n.WSURL.Scheme { | ||||||
case "ws", "wss": | ||||||
default: | ||||||
|
@@ -1028,3 +1026,16 @@ func (n *Node) SetFrom(f *Node) { | |||||
func ChainIDInt64(cid string) (int64, error) { | ||||||
return strconv.ParseInt(cid, 10, 64) | ||||||
} | ||||||
|
||||||
// verifyLogBroadcasterFlag checks node config and return error if LogBroadcaster enabled but some node missing WSURL | ||||||
func verifyLogBroadcasterFlag(nodes []*Node, LogBroadcasterEnabled bool) error { | ||||||
if !LogBroadcasterEnabled { | ||||||
return nil | ||||||
} | ||||||
for _, node := range nodes { | ||||||
if node.WSURL == nil || node.WSURL.IsZero() { | ||||||
return ErrLogBroadcasterEnabledWithoutWSURL | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1331,7 +1331,9 @@ func TestConfig_Validate(t *testing.T) { | |
- 1.ChainID: invalid value (1): duplicate - must be unique | ||
- 0.Nodes.1.Name: invalid value (foo): duplicate - must be unique | ||
- 3.Nodes.4.WSURL: invalid value (ws://dupe.com): duplicate - must be unique | ||
- 0: 3 errors: | ||
- 0: 5 errors: | ||
- logbroadcaster cannot be enabled unless all nodes have WSURL provided | ||
- Nodes: missing: must have at least one ws uri when LogBroadcaster is enabled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These seem contradictory. Must all nodes have WSURL, or just one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching it. According to the requirement, I was intended to enforce There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the error message. We going to require all primary nodes to provide valid WS URL when LogBroadcaster is enabled. |
||
- GasEstimator.BumpTxDepth: invalid value (11): must be less than or equal to Transactions.MaxInFlight | ||
- GasEstimator: 6 errors: | ||
- BumpPercent: invalid value (1): may not be less than Geth's default of 10 | ||
|
@@ -1341,9 +1343,7 @@ func TestConfig_Validate(t *testing.T) { | |
- PriceMax: invalid value (10 gwei): must be greater than or equal to PriceDefault | ||
- BlockHistory.BlockHistorySize: invalid value (0): must be greater than or equal to 1 with BlockHistory Mode | ||
- Nodes: 2 errors: | ||
- 0: 2 errors: | ||
- WSURL: missing: required for primary nodes | ||
- HTTPURL: missing: required for all nodes | ||
- 0.HTTPURL: missing: required for all nodes | ||
- 1.HTTPURL: missing: required for all nodes | ||
- 1: 10 errors: | ||
- ChainType: invalid value (Foo): must not be set with this chain id | ||
|
@@ -1365,17 +1365,15 @@ func TestConfig_Validate(t *testing.T) { | |
- FinalityDepth: invalid value (0): must be greater than or equal to 1 | ||
- MinIncomingConfirmations: invalid value (0): must be greater than or equal to 1 | ||
- 3.Nodes: 5 errors: | ||
- 0: 3 errors: | ||
- 0: 2 errors: | ||
- Name: missing: required for all nodes | ||
- WSURL: missing: required for primary nodes | ||
- HTTPURL: empty: required for all nodes | ||
- 1: 3 errors: | ||
- Name: missing: required for all nodes | ||
- WSURL: invalid value (http): must be ws or wss | ||
- HTTPURL: missing: required for all nodes | ||
- 2: 3 errors: | ||
- 2: 2 errors: | ||
- Name: empty: required for all nodes | ||
- WSURL: missing: required for primary nodes | ||
- HTTPURL: invalid value (ws): must be http or https | ||
- 3.HTTPURL: missing: required for all nodes | ||
- 4.HTTPURL: missing: required for all nodes | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.