Skip to content

Commit

Permalink
fix_: address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
friofry committed Oct 9, 2024
1 parent 137920c commit eb10b20
Show file tree
Hide file tree
Showing 25 changed files with 385 additions and 140 deletions.
32 changes: 16 additions & 16 deletions circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ func TestCircuitBreaker_SuccessCallStatus(t *testing.T) {
assert.Len(t, result.FunctorCallStatuses(), 1)

status := result.FunctorCallStatuses()[0]
if status.name != "successCircuit" {
t.Errorf("Expected functor name to be 'successCircuit', got %s", status.name)
if status.Name != "successCircuit" {
t.Errorf("Expected functor name to be 'successCircuit', got %s", status.Name)
}
if status.err != nil {
t.Errorf("Expected no error in functor status, got %v", status.err)
if status.Err != nil {
t.Errorf("Expected no error in functor status, got %v", status.Err)
}
}

Expand All @@ -350,11 +350,11 @@ func TestCircuitBreaker_ErrorCallStatus(t *testing.T) {
assert.Len(t, result.FunctorCallStatuses(), 1)

status := result.FunctorCallStatuses()[0]
if status.name != "errorCircuit" {
t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.name)
if status.Name != "errorCircuit" {
t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.Name)
}
if !errors.Is(status.err, expectedError) {
t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.err)
if !errors.Is(status.Err, expectedError) {
t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.Err)
}
}

Expand Down Expand Up @@ -405,11 +405,11 @@ func TestCircuitBreaker_MultipleFunctorsResult(t *testing.T) {
statuses := result.FunctorCallStatuses()
require.Len(t, statuses, 2)

require.Equal(t, statuses[0].name, "circuit1")
require.NotNil(t, statuses[0].err)
require.Equal(t, statuses[0].Name, "circuit1")
require.NotNil(t, statuses[0].Err)

require.Equal(t, statuses[1].name, "circuit2")
require.Nil(t, statuses[1].err)
require.Equal(t, statuses[1].Name, "circuit2")
require.Nil(t, statuses[1].Err)
}

func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
Expand Down Expand Up @@ -444,9 +444,9 @@ func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
statuses := result.FunctorCallStatuses()
require.Len(t, statuses, 2)

require.Equal(t, statuses[0].name, "circuitName")
require.NotNil(t, statuses[0].err)
require.Equal(t, statuses[0].Name, "circuitName")
require.NotNil(t, statuses[0].Err)

require.Equal(t, statuses[1].name, "circuitName")
require.Nil(t, statuses[1].err)
require.Equal(t, statuses[1].Name, "circuitName")
require.Nil(t, statuses[1].Err)
}
6 changes: 3 additions & 3 deletions healthmanager/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
// Aggregator manages and aggregates the statuses of multiple providers.
type Aggregator struct {
mu sync.RWMutex
Name string
name string
providerStatuses map[string]*rpcstatus.ProviderStatus
}

// NewAggregator creates a new instance of Aggregator with the given name.
func NewAggregator(name string) *Aggregator {
return &Aggregator{
Name: name,
name: name,
providerStatuses: make(map[string]*rpcstatus.ProviderStatus),
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (a *Aggregator) ComputeAggregatedStatus() rpcstatus.ProviderStatus {
}

aggregatedStatus := rpcstatus.ProviderStatus{
Name: a.Name,
Name: a.name,
LastSuccessAt: lastSuccessAt,
LastErrorAt: lastErrorAt,
LastError: lastError,
Expand Down
7 changes: 4 additions & 3 deletions healthmanager/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"testing"
"time"

"github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/status-im/status-go/healthmanager/rpcstatus"
)

// StatusAggregatorTestSuite defines the test suite for Aggregator.
Expand All @@ -23,7 +24,7 @@ func (suite *StatusAggregatorTestSuite) SetupTest() {

// TestNewAggregator verifies that a new Aggregator is initialized correctly.
func (suite *StatusAggregatorTestSuite) TestNewAggregator() {
assert.Equal(suite.T(), "TestAggregator", suite.aggregator.Name, "Aggregator name should be set correctly")
assert.Equal(suite.T(), "TestAggregator", suite.aggregator.name, "Aggregator name should be set correctly")
assert.Empty(suite.T(), suite.aggregator.providerStatuses, "Aggregator should have no providers initially")
}

Expand Down Expand Up @@ -94,7 +95,7 @@ func (suite *StatusAggregatorTestSuite) TestUpdate() {
func (suite *StatusAggregatorTestSuite) TestComputeAggregatedStatus_NoProviders() {
aggStatus := suite.aggregator.ComputeAggregatedStatus()

assert.Equal(suite.T(), rpcstatus.StatusUnknown, aggStatus.Status, "Aggregated status should be 'unknown' when no providers are registered")
assert.Equal(suite.T(), rpcstatus.StatusDown, aggStatus.Status, "Aggregated status should be 'down' when no providers are registered")
assert.True(suite.T(), aggStatus.LastSuccessAt.IsZero(), "LastSuccessAt should be zero when no providers are registered")
assert.True(suite.T(), aggStatus.LastErrorAt.IsZero(), "LastErrorAt should be zero when no providers are registered")
}
Expand Down
48 changes: 29 additions & 19 deletions healthmanager/blockchain_health_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package healthmanager

import (
"context"
"fmt"
"sync"

"github.com/status-im/status-go/healthmanager/aggregator"
"github.com/status-im/status-go/healthmanager/rpcstatus"
"sync"
)

// BlockchainFullStatus contains the full status of the blockchain, including provider statuses.
type BlockchainFullStatus struct {
Status rpcstatus.ProviderStatus `json:"status"`
StatusPerChain map[uint64]rpcstatus.ProviderStatus `json:"statusPerChain"`
StatusPerChainPerProvider map[uint64]map[string]rpcstatus.ProviderStatus `json:"statusPerChainPerProvider"`
}

Expand All @@ -28,7 +29,7 @@ type BlockchainHealthManager struct {

providers map[uint64]*ProvidersHealthManager
cancelFuncs map[uint64]context.CancelFunc // Map chainID to cancel functions
lastStatus BlockchainStatus
lastStatus *BlockchainStatus
wg sync.WaitGroup
}

Expand All @@ -43,30 +44,37 @@ func NewBlockchainHealthManager() *BlockchainHealthManager {
}

// RegisterProvidersHealthManager registers the provider health manager.
// It prevents registering the same provider twice for the same chain.
// It removes any existing provider for the same chain before registering the new one.
func (b *BlockchainHealthManager) RegisterProvidersHealthManager(ctx context.Context, phm *ProvidersHealthManager) error {
b.mu.Lock()
defer b.mu.Unlock()

// Check if the provider for the given chainID is already registered
if _, exists := b.providers[phm.ChainID()]; exists {
// Log a warning or return an error to indicate that the provider is already registered
return fmt.Errorf("provider for chainID %d is already registered", phm.ChainID())
chainID := phm.ChainID()

// Check if a provider for the given chainID is already registered and remove it
if _, exists := b.providers[chainID]; exists {
// Cancel the existing context
if cancel, cancelExists := b.cancelFuncs[chainID]; cancelExists {
cancel()
}
// Remove the old registration
delete(b.providers, chainID)
delete(b.cancelFuncs, chainID)
}

// Proceed with the registration
b.providers[phm.ChainID()] = phm
b.providers[chainID] = phm

// Create a new context for the provider
providerCtx, cancel := context.WithCancel(ctx)
b.cancelFuncs[phm.ChainID()] = cancel
b.cancelFuncs[chainID] = cancel

statusCh := phm.Subscribe()
b.wg.Add(1)
go func(phm *ProvidersHealthManager, statusCh chan struct{}, providerCtx context.Context) {
defer func() {
b.wg.Done()
phm.Unsubscribe(statusCh)
b.wg.Done()
}()
for {
select {
Expand All @@ -91,6 +99,7 @@ func (b *BlockchainHealthManager) Stop() {
cancel()
}
clear(b.cancelFuncs)
clear(b.providers)

b.mu.Unlock()
b.wg.Wait()
Expand Down Expand Up @@ -134,15 +143,15 @@ func (b *BlockchainHealthManager) aggregateAndUpdateStatus(ctx context.Context)
b.aggregator.UpdateBatch(providerStatuses)

// Get the new aggregated full and short status
newShortStatus := b.getShortStatus()
newShortStatus := b.getStatusPerChain()
b.mu.Unlock()

// Compare full and short statuses and emit if changed
if !compareShortStatus(newShortStatus, b.lastStatus) {
if b.lastStatus == nil || !compareShortStatus(newShortStatus, *b.lastStatus) {
b.emitBlockchainHealthStatus(ctx)
b.mu.Lock()
defer b.mu.Unlock()
b.lastStatus = newShortStatus
b.lastStatus = &newShortStatus
}
}

Expand Down Expand Up @@ -192,15 +201,16 @@ func (b *BlockchainHealthManager) GetFullStatus() BlockchainFullStatus {
statusPerChainPerProvider[chainID] = providerStatuses
}

blockchainStatus := b.aggregator.GetAggregatedStatus()
statusPerChain := b.getStatusPerChain()

return BlockchainFullStatus{
Status: blockchainStatus,
Status: statusPerChain.Status,
StatusPerChain: statusPerChain.StatusPerChain,
StatusPerChainPerProvider: statusPerChainPerProvider,
}
}

func (b *BlockchainHealthManager) getShortStatus() BlockchainStatus {
func (b *BlockchainHealthManager) getStatusPerChain() BlockchainStatus {
statusPerChain := make(map[uint64]rpcstatus.ProviderStatus)

for chainID, phm := range b.providers {
Expand All @@ -216,10 +226,10 @@ func (b *BlockchainHealthManager) getShortStatus() BlockchainStatus {
}
}

func (b *BlockchainHealthManager) GetShortStatus() BlockchainStatus {
func (b *BlockchainHealthManager) GetStatusPerChain() BlockchainStatus {
b.mu.RLock()
defer b.mu.RUnlock()
return b.getShortStatus()
return b.getStatusPerChain()
}

// Status returns the current aggregated status.
Expand Down
26 changes: 17 additions & 9 deletions healthmanager/blockchain_health_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"testing"
"time"

"github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/stretchr/testify/suite"

"github.com/status-im/status-go/healthmanager/rpcstatus"
)

type BlockchainHealthManagerSuite struct {
Expand Down Expand Up @@ -50,7 +51,8 @@ func (s *BlockchainHealthManagerSuite) assertBlockChainStatus(expected rpcstatus
// Test registering a provider health manager
func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() {
phm := NewProvidersHealthManager(1) // Create a real ProvidersHealthManager
s.manager.RegisterProvidersHealthManager(context.Background(), phm)
err := s.manager.RegisterProvidersHealthManager(context.Background(), phm)
s.Require().NoError(err)

// Verify that the provider is registered
s.Require().NotNil(s.manager.providers[1])
Expand All @@ -59,7 +61,8 @@ func (s *BlockchainHealthManagerSuite) TestRegisterProvidersHealthManager() {
// Test status updates and notifications
func (s *BlockchainHealthManagerSuite) TestStatusUpdateNotification() {
phm := NewProvidersHealthManager(1)
s.manager.RegisterProvidersHealthManager(context.Background(), phm)
err := s.manager.RegisterProvidersHealthManager(context.Background(), phm)
s.Require().NoError(err)
ch := s.manager.Subscribe()

// Update the provider status
Expand All @@ -75,8 +78,10 @@ func (s *BlockchainHealthManagerSuite) TestGetFullStatus() {
phm1 := NewProvidersHealthManager(1)
phm2 := NewProvidersHealthManager(2)
ctx := context.Background()
s.manager.RegisterProvidersHealthManager(ctx, phm1)
s.manager.RegisterProvidersHealthManager(ctx, phm2)
err := s.manager.RegisterProvidersHealthManager(ctx, phm1)
s.Require().NoError(err)
err = s.manager.RegisterProvidersHealthManager(ctx, phm2)
s.Require().NoError(err)
ch := s.manager.Subscribe()

// Update the provider status
Expand Down Expand Up @@ -120,7 +125,8 @@ func (s *BlockchainHealthManagerSuite) TestConcurrency() {
defer cancel()
for i := 1; i <= chainsCount; i++ {
phm := NewProvidersHealthManager(uint64(i))
s.manager.RegisterProvidersHealthManager(ctx, phm)
err := s.manager.RegisterProvidersHealthManager(ctx, phm)
s.Require().NoError(err)
}

ch := s.manager.Subscribe()
Expand Down Expand Up @@ -161,7 +167,8 @@ func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers()
// Create an instance of BlockchainHealthManager and register a provider manager
phm := NewProvidersHealthManager(1)
ctx, cancel := context.WithCancel(s.ctx)
s.manager.RegisterProvidersHealthManager(ctx, phm)
err := s.manager.RegisterProvidersHealthManager(ctx, phm)
s.Require().NoError(err)

defer cancel()

Expand Down Expand Up @@ -196,7 +203,8 @@ func (s *BlockchainHealthManagerSuite) TestUnsubscribeOneOfMultipleSubscribers()
func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() {
// Register a provider for chain 1
phm := NewProvidersHealthManager(1)
s.manager.RegisterProvidersHealthManager(s.ctx, phm)
err := s.manager.RegisterProvidersHealthManager(s.ctx, phm)
s.Require().NoError(err)

// Subscribe to status updates
ch := s.manager.Subscribe()
Expand All @@ -212,7 +220,7 @@ func (s *BlockchainHealthManagerSuite) TestMixedProviderStatusInSingleChain() {
s.waitForUpdate(ch, rpcstatus.StatusUp, 100*time.Millisecond)

// Verify that the short status reflects the chain as down, since one provider is down
shortStatus := s.manager.GetShortStatus()
shortStatus := s.manager.GetStatusPerChain()
s.Equal(rpcstatus.StatusUp, shortStatus.Status.Status)
s.Equal(rpcstatus.StatusUp, shortStatus.StatusPerChain[1].Status) // Chain 1 should be marked as down
}
Expand Down
3 changes: 2 additions & 1 deletion healthmanager/provider_errors/rpc_provider_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package provider_errors

import (
"errors"
"strings"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/rpc"
"strings"
)

type RpcProviderErrorType string
Expand Down
Loading

0 comments on commit eb10b20

Please sign in to comment.