Skip to content

Commit

Permalink
chore_: send blockchain status event from the rpc/client
Browse files Browse the repository at this point in the history
  • Loading branch information
friofry committed Oct 6, 2024
1 parent adc4e7b commit a88901a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 126 deletions.
2 changes: 1 addition & 1 deletion node/get_status_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (n *StatusNode) setupRPCClient() (err error) {
},
}

n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.NetworkID, n.config.UpstreamConfig, n.config.Networks, n.appDB, providerConfigs)
n.rpcClient, err = rpc.NewClient(gethNodeClient, n.config.NetworkID, n.config.UpstreamConfig, n.config.Networks, n.appDB, n.walletFeed, providerConfigs)
if err != nil {
return
}
Expand Down
8 changes: 4 additions & 4 deletions rpc/chain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"context"
"errors"
"fmt"
"github.com/status-im/status-go/health-manager"
"github.com/status-im/status-go/health-manager/rpcstatus"
"github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/healthmanager/rpcstatus"
"math/big"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -69,7 +69,7 @@ type ClientWithFallback struct {
ethClients []ethclient.RPSLimitedEthClientInterface
commonLimiter rpclimiter.RequestLimiter
circuitbreaker *circuitbreaker.CircuitBreaker
providersHealthManager *health_manager.ProvidersHealthManager
providersHealthManager *healthmanager.ProvidersHealthManager

WalletNotifier func(chainId uint64, message string)

Expand Down Expand Up @@ -114,7 +114,7 @@ var propagateErrors = []error{
bind.ErrNoCode,
}

func NewClient(ethClients []ethclient.RPSLimitedEthClientInterface, chainID uint64, providersHealthManager *health_manager.ProvidersHealthManager) *ClientWithFallback {
func NewClient(ethClients []ethclient.RPSLimitedEthClientInterface, chainID uint64, providersHealthManager *healthmanager.ProvidersHealthManager) *ClientWithFallback {
cbConfig := circuitbreaker.Config{
Timeout: 20000,
MaxConcurrentRequests: 100,
Expand Down
122 changes: 91 additions & 31 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/status-im/status-go/health-manager"
"net/http"
"net/url"
"reflect"
Expand All @@ -20,14 +19,17 @@ import (
"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum/go-ethereum/event"
appCommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/rpcstats"
"github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/walletevent"
)

const (
Expand All @@ -49,6 +51,9 @@ const (
// rpcUserAgentUpstreamFormat a separate user agent format for upstream, because we should not be using upstream
// if we see this user agent in the logs that means parts of the application are using a malconfigured http client
rpcUserAgentUpstreamFormat = "procuratee-%s-upstream/%s"

EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed" // Deprecated event
EventBlockchainHealthChanged walletevent.EventType = "wallet-blockchain-health-changed" // Full status of the blockchain (including provider statuses)
)

// List of RPC client errors.
Expand Down Expand Up @@ -102,15 +107,16 @@ type Client struct {
rpsLimiterMutex sync.RWMutex
limiterPerProvider map[string]*rpclimiter.RPCRpsLimiter

router *router
NetworkManager *network.Manager
BlockchainHealthManager *health_manager.BlockchainHealthManager
router *router
NetworkManager *network.Manager

healthMgr *healthmanager.BlockchainHealthManager
walletFeed *event.Feed

handlersMx sync.RWMutex // mx guards handlers
handlers map[string]Handler // locally registered handlers
log log.Logger

walletNotifier func(chainID uint64, message string)
providerConfigs []params.ProviderConfig
}

Expand All @@ -122,7 +128,7 @@ var verifProxyInitFn func(c *Client)
//
// Client is safe for concurrent use and will automatically
// reconnect to the server if connection is lost.
func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.UpstreamRPCConfig, networks []params.Network, db *sql.DB, providerConfigs []params.ProviderConfig) (*Client, error) {
func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.UpstreamRPCConfig, networks []params.Network, db *sql.DB, walletFeed *event.Feed, providerConfigs []params.ProviderConfig) (*Client, error) {
var err error

log := log.New("package", "status-go/rpc.Client")
Expand All @@ -137,14 +143,15 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
}

c := Client{
local: client,
NetworkManager: networkManager,
handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface),
limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter),
log: log,
providerConfigs: providerConfigs,
BlockchainHealthManager: health_manager.NewBlockchainHealthManager(),
local: client,
NetworkManager: networkManager,
handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface),
limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter),
log: log,
providerConfigs: providerConfigs,
healthMgr: healthmanager.NewBlockchainHealthManager(),
walletFeed: walletFeed,
}

var opts []gethrpc.ClientOption
Expand Down Expand Up @@ -177,9 +184,9 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
ethClients := []ethclient.RPSLimitedEthClientInterface{
ethclient.NewRPSLimitedEthClient(upstreamClient, limiter, rpcName),
}
providersHealthManager := health_manager.NewProvidersHealthManager(upstreamChainID)
c.BlockchainHealthManager.RegisterProvidersHealthManager(upstreamChainID, providersHealthManager)
c.upstream = chain.NewClient(ethClients, upstreamChainID, providersHealthManager)
phm := healthmanager.NewProvidersHealthManager(upstreamChainID)
c.blockchainHealthManager.RegisterProvidersHealthManager(phm)
c.upstream = chain.NewClient(ethClients, upstreamChainID, phm)
}

c.router = newRouter(c.upstreamEnabled)
Expand All @@ -191,16 +198,73 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
return &c, nil
}

func (c *Client) GetNetworkManager() *network.Manager {
return c.NetworkManager
func (c *Client) Start(context context.Context) {
if c.stopHealthMonitor != nil {
c.log.Warn("Blockchain health manager already started")
return
}
ctx, cancel := context.WithCancel(context)
c.stopHealthMonitor = cancel
statusCh := c.healthMgr.Subscribe()
c.healthMgr.Start(ctx)
go c.monitorHealth(ctx, statusCh)
}

func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)) {
c.walletNotifier = notifier
func (c *Client) Stop() {
if c.stopHealthMonitor == nil {
return
}
c.stopHealthMonitor()
c.stopHealthMonitor = nil
}

func (c *Client) monitorHealth(ctx context.Context, statusCh chan struct{}) {
sendFullStatusEventFunc := func(blockchainStatus healthmanager.BlockchainFullStatus) {
encodedMessage, err := json.Marshal(blockchainStatus)
if err != nil {
c.log.Warn("could not marshal full blockchain status", "error", err)
return
}
c.walletFeed.Send(walletevent.Event{
Type: EventBlockchainHealthChanged,
Message: string(encodedMessage),
At: time.Now().Unix(),
})
}

sendShortEventFunc := func(blockchainStatus healthmanager.BlockchainFullStatus) {
blockchainStatusOld := make(map[uint64]string)
for chainID, chainStatus := range blockchainStatusOld.StatusPerChainPerProvider {
statusStr := string(chainStatus.Status)
blockchainStatusOld[chainID] = statusStr
}
encodedMessage, err := json.Marshal(blockchainStatusOld)
if err != nil {
c.log.Warn("could not marshal short blockchain status", "error", err)
return
}
c.walletFeed.Send(walletevent.Event{
Type: EventBlockchainStatusChanged,
Message: string(encodedMessage),
At: time.Now().Unix(),
})
}

for {
select {
case <-ctx.Done():
return
case <-statusCh:
blockchainStatus := c.healthMgr.GetRpcFullStatus()
sendFullStatusEventFunc(blockchainStatus)
// TODO: remove when it's not needed anymore
sendShortEventFunc(blockchainStatus)
}
}
}

func (c *Client) SubscribeHealthStatus() chan health_manager.BlockchainHealthStatus {
return c.BlockchainHealthManager.Subscribe()
func (c *Client) GetNetworkManager() *network.Manager {
return c.NetworkManager
}

func extractHostFromURL(inputURL string) (string, error) {
Expand Down Expand Up @@ -236,9 +300,6 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
c.rpcClientsMutex.Lock()
defer c.rpcClientsMutex.Unlock()
if rpcClient, ok := c.rpcClients[chainID]; ok {
if rpcClient.GetWalletNotifier() == nil {
rpcClient.SetWalletNotifier(c.walletNotifier)
}
return rpcClient, nil
}

Expand All @@ -255,11 +316,10 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return nil, fmt.Errorf("could not find any RPC URL for chain: %d", chainID)
}

providersHealthManager := health_manager.NewProvidersHealthManager(chainID)
c.BlockchainHealthManager.RegisterProvidersHealthManager(chainID, providersHealthManager)
phm := healthmanager.NewProvidersHealthManager(chainID)
c.blockchainHealthManager.RegisterProvidersHealthManager(phm)

client := chain.NewClient(ethClients, chainID, providersHealthManager)
client.SetWalletNotifier(c.walletNotifier)
client := chain.NewClient(ethClients, chainID, phm)
c.rpcClients[chainID] = client
return client, nil
}
Expand Down Expand Up @@ -403,7 +463,7 @@ func (c *Client) UpdateUpstreamURL(url string) error {
ethClients := []ethclient.RPSLimitedEthClientInterface{
ethclient.NewRPSLimitedEthClient(rpcClient, rpsLimiter, hostPortUpstream),
}
providersHealthManager := health_manager.NewProvidersHealthManager(c.UpstreamChainID)
providersHealthManager := healthmanager.NewProvidersHealthManager(c.UpstreamChainID)
c.BlockchainHealthManager.RegisterProvidersHealthManager(c.UpstreamChainID, providersHealthManager)
c.upstream = chain.NewClient(ethClients, c.UpstreamChainID, providersHealthManager)
c.upstreamURL = url
Expand Down
4 changes: 2 additions & 2 deletions services/wallet/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package market

import (
"context"
"github.com/status-im/status-go/health-manager/network_errors"
"github.com/status-im/status-go/healthmanager/networkerrors"
"sync"
"time"

Expand Down Expand Up @@ -99,7 +99,7 @@ func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(pr
}

result := pm.circuitbreaker.Execute(cmd)
skipNotification := network_errors.IsConnectionError(result.Error())
skipNotification := networkerrors.IsConnectionError(result.Error())
pm.setIsConnected(result.Error() == nil, skipNotification)

if result.Error() != nil {
Expand Down
76 changes: 1 addition & 75 deletions services/wallet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package wallet

import (
"database/sql"
"encoding/json"
"fmt"
healthManager "github.com/status-im/status-go/health-manager"

"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -43,11 +41,6 @@ import (
"github.com/status-im/status-go/transactions"
)

const (
EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed"
EventBlockchainHealthChanged walletevent.EventType = "wallet-blockchain-health-changed"
)

// NewService initializes service instance.
func NewService(
db *sql.DB,
Expand All @@ -71,35 +64,6 @@ func NewService(
}
blockchainStatus := make(map[uint64]string)
mutex := sync.Mutex{}
rpcClient.SetWalletNotifier(func(chainID uint64, message string) {
mutex.Lock()
defer mutex.Unlock()

if len(blockchainStatus) == 0 {
networks, err := rpcClient.NetworkManager.Get(false)
if err != nil {
return
}

for _, network := range networks {
blockchainStatus[network.ChainID] = "up"
}
}

blockchainStatus[chainID] = message
encodedmessage, err := json.Marshal(blockchainStatus)
if err != nil {
return
}

feed.Send(walletevent.Event{
Type: EventBlockchainStatusChanged,
Accounts: []common.Address{},
Message: string(encodedmessage),
At: time.Now().Unix(),
ChainID: chainID,
})
})

communityManager := community.NewManager(db, mediaServer, feed)
balanceCacher := balance.NewCacherWithTTL(5 * time.Minute)
Expand Down Expand Up @@ -253,7 +217,6 @@ type Service struct {
keycardPairings *KeycardPairings
config *params.NodeConfig
featureFlags *protocolCommon.FeatureFlags
blockchainHealthCh chan healthManager.BlockchainHealthStatus
}

// Start signals transmitter.
Expand All @@ -264,45 +227,9 @@ func (s *Service) Start() error {
s.history.Start()
s.collectibles.Start()
s.started = true
s.blockchainHealthCh = s.rpcClient.SubscribeHealthStatus()
go s.handleBlockchainHealthStatus(s.blockchainHealthCh)
return err
}

func (s *Service) handleBlockchainHealthStatus(blockchainHealthCh chan healthManager.BlockchainHealthStatus) {
for range blockchainHealthCh {
bhm := s.rpcClient.BlockchainHealthManager
blockchainStatus := bhm.GetFullStatus()
jsonData, err := json.Marshal(blockchainStatus)
if err != nil {
continue
}
s.feed.Send(walletevent.Event{
Type: EventBlockchainHealthChanged,
Message: string(jsonData),
At: time.Now().Unix(),
})
//
//// send old event
//// TODO: remove and use the new event only
//blockchainStatusOld := make(map[uint64]string)
//for chainID, chainStatus := range blockchainStatus.Status.Chains {
// statusStr := string(chainStatus.Status)
// blockchainStatusOld[chainID] = statusStr
//}
//encodedMessage, err := json.Marshal(blockchainStatus)
//if err != nil {
// continue
//}
//s.feed.Send(walletevent.Event{
// Type: EventBlockchainStatusChanged,
// Accounts: []common.Address{},
// Message: string(encodedMessage),
// At: time.Now().Unix(),
//})
}
}

// Set external Collectibles community info provider
func (s *Service) SetWalletCommunityInfoProvider(provider thirdparty.CommunityInfoProvider) {
s.communityManager.SetCommunityInfoProvider(provider)
Expand All @@ -319,7 +246,6 @@ func (s *Service) Stop() error {
s.activity.Stop()
s.collectibles.Stop()
s.tokenManager.Stop()
s.rpcClient.BlockchainHealthManager.Unsubscribe(s.blockchainHealthCh)

s.started = false
log.Info("wallet stopped")
Expand Down
Loading

0 comments on commit a88901a

Please sign in to comment.