diff --git a/pkg/recws/recws.go b/pkg/recws/recws.go index 5df76d8..19dbf25 100644 --- a/pkg/recws/recws.go +++ b/pkg/recws/recws.go @@ -48,6 +48,10 @@ type RecConn struct { ReadBufferSize int NonVerbose bool + NodeName string + EndPoint string + MaxPoolCap int + isConnected bool mu sync.RWMutex url string diff --git a/rpc/query.go b/rpc/query.go index 7be2d35..6bb98db 100644 --- a/rpc/query.go +++ b/rpc/query.go @@ -11,10 +11,10 @@ import ( ) // Read substrate storage -func ReadStorage(p websocket.WsConn, module, prefix string, hash string, arg ...string) (r storage.StateStorage, err error) { +func ReadStorage(nodeName websocket.NodeName, p websocket.WsConn, module, prefix string, hash string, arg ...string) (r storage.StateStorage, err error) { key := storageKey.EncodeStorageKey(module, prefix, arg...) v := &JsonRpcResult{} - if err = websocket.SendWsRequest(p, v, StateGetStorage(rand.Intn(10000), util.AddHex(key.EncodeKey), hash)); err != nil { + if err = websocket.SendWsRequest(nodeName, p, v, StateGetStorage(rand.Intn(10000), util.AddHex(key.EncodeKey), hash)); err != nil { return } if dataHex, err := v.ToString(); err == nil { @@ -27,10 +27,10 @@ func ReadStorage(p websocket.WsConn, module, prefix string, hash string, arg ... } -func ReadKeysPaged(p websocket.WsConn, module, prefix string) (r []string, scale string, err error) { +func ReadKeysPaged(nodeName websocket.NodeName, p websocket.WsConn, module, prefix string) (r []string, scale string, err error) { key := storageKey.EncodeStorageKey(module, prefix) v := &JsonRpcResult{} - if err = websocket.SendWsRequest(p, v, StateGetKeysPaged(rand.Intn(10000), util.AddHex(key.EncodeKey))); err != nil { + if err = websocket.SendWsRequest(nodeName, p, v, StateGetKeysPaged(rand.Intn(10000), util.AddHex(key.EncodeKey))); err != nil { return } if keys, err := v.ToInterfaces(); err == nil { @@ -41,9 +41,9 @@ func ReadKeysPaged(p websocket.WsConn, module, prefix string) (r []string, scale return r, key.ScaleType, err } -func GetPaymentQueryInfo(p websocket.WsConn, encodedExtrinsic string) (paymentInfo *PaymentQueryInfo, err error) { +func GetPaymentQueryInfo(nodeName websocket.NodeName, p websocket.WsConn, encodedExtrinsic string) (paymentInfo *PaymentQueryInfo, err error) { v := &JsonRpcResult{} - if err = websocket.SendWsRequest(p, v, SystemPaymentQueryInfo(rand.Intn(10000), util.AddHex(encodedExtrinsic))); err != nil { + if err = websocket.SendWsRequest(nodeName, p, v, SystemPaymentQueryInfo(rand.Intn(10000), util.AddHex(encodedExtrinsic))); err != nil { return } paymentInfo = v.ToPaymentQueryInfo() @@ -53,9 +53,9 @@ func GetPaymentQueryInfo(p websocket.WsConn, encodedExtrinsic string) (paymentIn return } -func ReadStorageByKey(p websocket.WsConn, key storageKey.StorageKey, hash string) (r storage.StateStorage, err error) { +func ReadStorageByKey(nodeName websocket.NodeName, p websocket.WsConn, key storageKey.StorageKey, hash string) (r storage.StateStorage, err error) { v := &JsonRpcResult{} - if err = websocket.SendWsRequest(p, v, StateGetStorage(rand.Intn(10000), key.EncodeKey, hash)); err != nil { + if err = websocket.SendWsRequest(nodeName, p, v, StateGetStorage(rand.Intn(10000), key.EncodeKey, hash)); err != nil { return } if dataHex, err := v.ToString(); err == nil { @@ -67,18 +67,18 @@ func ReadStorageByKey(p websocket.WsConn, key storageKey.StorageKey, hash string return } -func GetMetadataByHash(p websocket.WsConn, hash ...string) (string, error) { +func GetMetadataByHash(nodeName websocket.NodeName, p websocket.WsConn, hash ...string) (string, error) { v := &JsonRpcResult{} - if err := websocket.SendWsRequest(p, v, StateGetMetadata(rand.Intn(10), hash...)); err != nil { + if err := websocket.SendWsRequest(nodeName, p, v, StateGetMetadata(rand.Intn(10), hash...)); err != nil { return "", err } return v.ToString() } -func GetSystemProperties(p websocket.WsConn) (*Properties, error) { +func GetSystemProperties(nodeName websocket.NodeName, p websocket.WsConn) (*Properties, error) { var t Properties v := &JsonRpcResult{} - if err := websocket.SendWsRequest(p, v, SystemProperties(rand.Intn(1000))); err != nil { + if err := websocket.SendWsRequest(nodeName, p, v, SystemProperties(rand.Intn(1000))); err != nil { return nil, err } err := v.ToAnyThing(&t) diff --git a/websocket/pool.go b/websocket/pool.go index 1086a2b..1384da8 100644 --- a/websocket/pool.go +++ b/websocket/pool.go @@ -159,12 +159,18 @@ func (c *channelPool) wrapConn(conn *recws.RecConn) *PoolConn { return p } -var wsPool Pool +type NodeName string -func SendWsRequest(c WsConn, v interface{}, action []byte) (err error) { +const ( + DefaultNodeName NodeName = "default" +) + +var wsPool = make(map[NodeName]Pool) + +func SendWsRequest(nodeName NodeName, c WsConn, v interface{}, action []byte) (err error) { var p *PoolConn if c == nil { - if p, err = Init(); err != nil { + if p, err = Init(nodeName); err != nil { return } defer p.Close() diff --git a/websocket/test/websocket_test.go b/websocket/test/websocket_test.go new file mode 100644 index 0000000..39b67d5 --- /dev/null +++ b/websocket/test/websocket_test.go @@ -0,0 +1,69 @@ +package test + +import ( + "testing" + "time" + + "github.com/itering/substrate-api-rpc/rpc" + "github.com/itering/substrate-api-rpc/websocket" +) + +func TestInit(t *testing.T) { + websocket.Init( + "", + websocket.WithEndPoint("wss://rpc.polkadot.io"), + websocket.WithMaxPoolCap(100), + websocket.WithHandshakeTimeout(5*time.Second), + websocket.WithWriteTimeoutTimeout(60*time.Second), + websocket.WithReadTimeoutTimeout(60*time.Second), + websocket.WithWriteBufferSize(5242880), + ) + + v := &rpc.JsonRpcResult{} + websocket.SendWsRequest("", nil, v, rpc.ChainGetBlockHash(1, 1)) + t.Log(v) +} + +func TestMultiInit(t *testing.T) { + // 1.default client + websocket.Init( + "", + websocket.WithEndPoint("wss://rpc.polkadot.io"), + websocket.WithMaxPoolCap(100), + websocket.WithHandshakeTimeout(5*time.Second), + websocket.WithWriteTimeoutTimeout(60*time.Second), + websocket.WithReadTimeoutTimeout(60*time.Second), + websocket.WithWriteBufferSize(5242880), + ) + + v := &rpc.JsonRpcResult{} + websocket.SendWsRequest("", nil, v, rpc.ChainGetBlockHash(1, 1)) + if v.Result.(string) != "0xc0096358534ec8d21d01d34b836eed476a1c343f8724fa2153dc0725ad797a90" { + t.Fatal(v) + } else { + t.Log(v) + } + + // 2.westend client + const ( + Westend websocket.NodeName = "westend" + WestendEndPoint string = "wss://westend-rpc.polkadot.io" + ) + websocket.Init( + Westend, + websocket.WithEndPoint(WestendEndPoint), + websocket.WithMaxPoolCap(100), + websocket.WithHandshakeTimeout(5*time.Second), + websocket.WithWriteTimeoutTimeout(60*time.Second), + websocket.WithReadTimeoutTimeout(60*time.Second), + websocket.WithWriteBufferSize(5242880), + ) + + v2 := &rpc.JsonRpcResult{} + websocket.SendWsRequest(Westend, nil, v2, rpc.ChainGetBlockHash(1, 1)) + if v2.Result.(string) != "0x44ef51c86927a1e2da55754dba9684dd6ff9bac8c61624ffe958be656c42e036" { + t.Fatal(v2) + } else { + t.Log(v2) + } +} diff --git a/websocket/websocket.go b/websocket/websocket.go index fa7618a..ff09f57 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -8,11 +8,6 @@ import ( "github.com/itering/substrate-api-rpc/pkg/recws" ) -var ( - wsEndPoint = "" - maxCap = 25 -) - type WsConn interface { Dial(urlStr string, reqHeader http.Header) IsConnected() bool @@ -25,9 +20,16 @@ type WsConn interface { CloseAndReconnect() } -func Init(options ...Option) (*PoolConn, error) { - var err error - if wsPool == nil { +func Init(nodeName NodeName, options ...Option) (*PoolConn, error) { + if nodeName == "" { + nodeName = DefaultNodeName + } + + var ( + err error + maxCap = 25 + ) + if _, ok := wsPool[nodeName]; !ok { factory := func() (*recws.RecConn, error) { SubscribeConn := &recws.RecConn{ KeepAliveTimeout: 10 * time.Second, @@ -38,32 +40,29 @@ func Init(options ...Option) (*PoolConn, error) { for _, o := range options { o.Apply(SubscribeConn) } - SubscribeConn.Dial(wsEndPoint, nil) + + SubscribeConn.Dial(SubscribeConn.EndPoint, nil) + + if SubscribeConn.MaxPoolCap > 0 { + maxCap = SubscribeConn.MaxPoolCap + } + return SubscribeConn, err } - if wsPool, err = NewChannelPool(1, maxCap, factory); err != nil { + if wsPool[nodeName], err = NewChannelPool(1, maxCap, factory); err != nil { fmt.Println("NewChannelPool", err) } } if err != nil { return nil, err } - conn, err := wsPool.Get() + conn, err := wsPool[nodeName].Get() return conn, err } -func SetEndpoint(endpoint string) { - wsEndPoint = endpoint -} - -// SetChannelPoolMaxCap set connection pool max cap -func SetChannelPoolMaxCap(max int) { - maxCap = max -} - -func Close() { - if wsPool != nil { - wsPool.Close() +func Close(nodeName NodeName) { + if pool, ok := wsPool[nodeName]; ok && pool != nil { + pool.Close() } } @@ -106,3 +105,15 @@ func WithWriteBufferSize(size int) Option { m.WriteBufferSize = size }) } + +func WithEndPoint(endPoint string) Option { + return OptionFunc(func(m *recws.RecConn) { + m.EndPoint = endPoint + }) +} + +func WithMaxPoolCap(cap int) Option { + return OptionFunc(func(m *recws.RecConn) { + m.MaxPoolCap = cap + }) +}