Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(wallet): Ignore chain down notifications when connection is lost #5820

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"fmt"
"time"

"github.com/afex/hystrix-go/hystrix"

Expand All @@ -12,8 +13,16 @@
type FallbackFunc func() ([]any, error)

type CommandResult struct {
res []any
err error
res []any
err error
functorCallStatuses []FunctorCallStatus
cancelled bool
}

type FunctorCallStatus struct {
Name string
Timestamp time.Time
Err error
}

func (cr CommandResult) Result() []any {
Expand All @@ -23,6 +32,21 @@
func (cr CommandResult) Error() error {
return cr.err
}
func (cr CommandResult) Cancelled() bool {
return cr.cancelled

Check warning on line 36 in circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuit_breaker.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}

func (cr CommandResult) FunctorCallStatuses() []FunctorCallStatus {
return cr.functorCallStatuses
}

func (cr *CommandResult) addCallStatus(circuitName string, err error) {
cr.functorCallStatuses = append(cr.functorCallStatuses, FunctorCallStatus{
Name: circuitName,
Timestamp: time.Now(),
Err: err,
})
}

type Command struct {
ctx context.Context
Expand Down Expand Up @@ -106,23 +130,26 @@

for i, f := range cmd.functors {
if cmd.cancel {
result.cancelled = true

Check warning on line 133 in circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuit_breaker.go#L133

Added line #L133 was not covered by tests
break
}

var err error
circuitName := f.circuitName
if cb.circuitNameHandler != nil {
circuitName = cb.circuitNameHandler(circuitName)

Check warning on line 140 in circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuit_breaker.go#L140

Added line #L140 was not covered by tests
}

// if last command, execute without circuit
if i == len(cmd.functors)-1 {
res, execErr := f.exec()
err = execErr
if err == nil {
result = CommandResult{res: res}
result.res = res
result.err = nil
}
result.addCallStatus(circuitName, err)
} else {
circuitName := f.circuitName
if cb.circuitNameHandler != nil {
circuitName = cb.circuitNameHandler(circuitName)
}

if hystrix.GetCircuitSettings()[circuitName] == nil {
hystrix.ConfigureCommand(circuitName, hystrix.CommandConfig{
Timeout: cb.config.Timeout,
Expand All @@ -137,13 +164,16 @@
res, err := f.exec()
// Write to result only if success
if err == nil {
result = CommandResult{res: res}
result.res = res
result.err = nil
}
result.addCallStatus(circuitName, err)

// If the command has been cancelled, we don't count
// the error towars breaking the circuit, and then we break
if cmd.cancel {
result = accumulateCommandError(result, f.circuitName, err)
result = accumulateCommandError(result, circuitName, err)
result.cancelled = true

Check warning on line 176 in circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

circuitbreaker/circuit_breaker.go#L175-L176

Added lines #L175 - L176 were not covered by tests
return nil
}
if err != nil {
Expand All @@ -156,7 +186,7 @@
break
}

result = accumulateCommandError(result, f.circuitName, err)
result = accumulateCommandError(result, circuitName, err)

// Lets abuse every provider with the same amount of MaxConcurrentRequests,
// keep iterating even in case of ErrMaxConcurrency error
Expand Down
149 changes: 149 additions & 0 deletions circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestCircuitBreaker_ExecuteSuccessSingle(t *testing.T) {
result := cb.Execute(cmd)
require.NoError(t, result.Error())
require.Equal(t, expectedResult, result.Result()[0].(string))
require.False(t, result.Cancelled())
}

func TestCircuitBreaker_ExecuteMultipleFallbacksFail(t *testing.T) {
Expand Down Expand Up @@ -219,9 +220,11 @@ func TestCircuitBreaker_CommandCancel(t *testing.T) {

result := cb.Execute(cmd)
require.True(t, errors.Is(result.Error(), expectedErr))
require.True(t, result.Cancelled())

assert.Equal(t, 1, prov1Called)
assert.Equal(t, 0, prov2Called)

}

func TestCircuitBreaker_EmptyOrNilCommand(t *testing.T) {
Expand Down Expand Up @@ -301,3 +304,149 @@ func TestCircuitBreaker_Fallback(t *testing.T) {

assert.Equal(t, 1, prov1Called)
}

func TestCircuitBreaker_SuccessCallStatus(t *testing.T) {
cb := NewCircuitBreaker(Config{})

functor := NewFunctor(func() ([]any, error) {
return []any{"success"}, nil
}, "successCircuit")

cmd := NewCommand(context.Background(), []*Functor{functor})

result := cb.Execute(cmd)

require.Nil(t, result.Error())
require.False(t, result.Cancelled())
assert.Len(t, result.Result(), 1)
require.Equal(t, "success", result.Result()[0])
assert.Len(t, result.FunctorCallStatuses(), 1)

status := result.FunctorCallStatuses()[0]
if status.Name != "successCircuit" {
t.Errorf("Expected functor name to be 'successCircuit', got %s", status.Name)
}
if status.Err != nil {
t.Errorf("Expected no error in functor status, got %v", status.Err)
}
}

func TestCircuitBreaker_ErrorCallStatus(t *testing.T) {
cb := NewCircuitBreaker(Config{})

expectedError := errors.New("functor error")
functor := NewFunctor(func() ([]any, error) {
return nil, expectedError
}, "errorCircuit")

cmd := NewCommand(context.Background(), []*Functor{functor})

result := cb.Execute(cmd)

require.NotNil(t, result.Error())
require.True(t, errors.Is(result.Error(), expectedError))

assert.Len(t, result.Result(), 0)
assert.Len(t, result.FunctorCallStatuses(), 1)

status := result.FunctorCallStatuses()[0]
if status.Name != "errorCircuit" {
t.Errorf("Expected functor name to be 'errorCircuit', got %s", status.Name)
}
if !errors.Is(status.Err, expectedError) {
t.Errorf("Expected functor error to be '%v', got '%v'", expectedError, status.Err)
}
}

func TestCircuitBreaker_CancelledResult(t *testing.T) {
cb := NewCircuitBreaker(Config{Timeout: 1000})

functor := NewFunctor(func() ([]any, error) {
time.Sleep(500 * time.Millisecond)
return []any{"should not be returned"}, nil
}, "cancelCircuit")

cmd := NewCommand(context.Background(), []*Functor{functor})
cmd.Cancel()

result := cb.Execute(cmd)

assert.True(t, result.Cancelled())
require.Nil(t, result.Error())
require.Empty(t, result.Result())
require.Empty(t, result.FunctorCallStatuses())
}

func TestCircuitBreaker_MultipleFunctorsResult(t *testing.T) {
cb := NewCircuitBreaker(Config{
Timeout: 1000,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 20,
SleepWindow: 5000,
ErrorPercentThreshold: 50,
})

functor1 := NewFunctor(func() ([]any, error) {
return nil, errors.New("functor1 error")
}, "circuit1")

functor2 := NewFunctor(func() ([]any, error) {
return []any{"success from functor2"}, nil
}, "circuit2")

cmd := NewCommand(context.Background(), []*Functor{functor1, functor2})

result := cb.Execute(cmd)

require.Nil(t, result.Error())

require.Len(t, result.Result(), 1)
require.Equal(t, result.Result()[0], "success from functor2")
statuses := result.FunctorCallStatuses()
require.Len(t, statuses, 2)

require.Equal(t, statuses[0].Name, "circuit1")
require.NotNil(t, statuses[0].Err)

require.Equal(t, statuses[1].Name, "circuit2")
require.Nil(t, statuses[1].Err)
}

func TestCircuitBreaker_LastFunctorDirectExecution(t *testing.T) {
cb := NewCircuitBreaker(Config{
Timeout: 10, // short timeout to open circuit
MaxConcurrentRequests: 1,
RequestVolumeThreshold: 1,
SleepWindow: 1000,
ErrorPercentThreshold: 1,
})

failingFunctor := NewFunctor(func() ([]any, error) {
time.Sleep(20 * time.Millisecond)
return nil, errors.New("should time out")
}, "circuitName")

successFunctor := NewFunctor(func() ([]any, error) {
return []any{"success without circuit"}, nil
}, "circuitName")

cmd := NewCommand(context.Background(), []*Functor{failingFunctor, successFunctor})

require.False(t, IsCircuitOpen("circuitName"))
result := cb.Execute(cmd)

require.True(t, CircuitExists("circuitName"))
require.Nil(t, result.Error())

require.Len(t, result.Result(), 1)
require.Equal(t, result.Result()[0], "success without circuit")

statuses := result.FunctorCallStatuses()
require.Len(t, statuses, 2)

require.Equal(t, statuses[0].Name, "circuitName")
require.NotNil(t, statuses[0].Err)

require.Equal(t, statuses[1].Name, "circuitName")
require.Nil(t, statuses[1].Err)
}
Loading
Loading