Skip to content

Commit

Permalink
refactor: move observe once logic to breaker
Browse files Browse the repository at this point in the history
  • Loading branch information
costela committed Oct 9, 2023
1 parent 58f1824 commit 3ea98b5
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.20' ]
go: [ '1.21' ]
name: go ${{ matrix.go }}
steps:
- uses: actions/checkout@v3
Expand Down
55 changes: 29 additions & 26 deletions breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hoglet

import (
"math"
"sync"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -67,9 +68,21 @@ func (c *callable) state() state {
return stateHalfOpen
}

type observableFunc func(failure bool)
// newObservableCall creates a new [Observable] that ensure it can only be observed a single time.
func newObservableCall(f func(bool)) Observable {
o := sync.Once{}
return observableCall(func(failure bool) {
o.Do(func() {
f(failure)
})
})
}

// observableCall tracks a single call through the breaker.
// It should be instantiated via [newObservableCall] to ensure the observer is only called once.
type observableCall func(bool)

func (o observableFunc) Observe(failure bool) {
func (o observableCall) Observe(failure bool) {
o(failure)
}

Expand Down Expand Up @@ -116,20 +129,15 @@ func NewEWMABreaker(sampleCount int, threshold float64, halfOpenDelay time.Durat
}

func (e *EWMABreaker) Call() Observable {
switch e.callable.state() {
case stateClosed:
return observableFunc(func(failure bool) {
e.observe(false, failure)
})
case stateHalfOpen:
return observableFunc(func(failure bool) {
e.observe(true, failure)
})
case stateOpen:
state := e.callable.state()

if state == stateOpen {
return nil
default:
panic("hoglet: unknown state") // should not happen; see callable.state()
}

return newObservableCall(func(failure bool) {
e.observe(state == stateHalfOpen, failure)
})
}

func (e *EWMABreaker) observe(halfOpen, failure bool) {
Expand Down Expand Up @@ -201,20 +209,15 @@ func NewSlidingWindowBreaker(windowSize time.Duration, threshold float64, halfOp
}

func (s *SlidingWindowBreaker) Call() Observable {
switch s.callable.state() {
case stateClosed:
return observableFunc(func(failure bool) {
s.observe(false, failure)
})
case stateHalfOpen:
return observableFunc(func(failure bool) {
s.observe(true, failure)
})
case stateOpen:
state := s.callable.state()

if state == stateOpen {
return nil
default:
panic("hoglet: unknown state") // should not happen; see callable.state()
}

return newObservableCall(func(failure bool) {
s.observe(state == stateHalfOpen, failure)
})
}

func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
Expand Down
27 changes: 7 additions & 20 deletions hoglet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package hoglet
import (
"context"
"errors"
"sync"
)

// Circuit wraps a function and behaves like a simple circuit and breaker: it opens when the wrapped function fails and
Expand Down Expand Up @@ -72,19 +71,17 @@ func (b *Circuit[IN, OUT]) Do(ctx context.Context, in IN) (out OUT, err error) {
return out, ErrCircuitOpen
}

once := &sync.Once{}

ctx, cancel := context.WithCancelCause(ctx)
defer cancel(internalCancellation)
go b.observeCtxOnce(once, obs, ctx)
go b.observeCtx(obs, ctx)

defer func() {
// ensure we also open the breaker on panics
if err := recover(); err != nil {
b.observeOnce(once, obs, true)
obs.Observe(true)
panic(err) // let the caller deal with panics
}
b.observeOnce(once, obs, b.options.isFailure(err))
obs.Observe(b.options.isFailure(err))
}()

return b.f(ctx, in)
Expand All @@ -97,22 +94,12 @@ func (b *Circuit[IN, OUT]) observable() Observable {
return b.breaker.Call()
}

// observeOnce is a helper to work around the inherent racyness of the "context watch" goroutine and ensure we only
// observe one result.
func (b *Circuit[IN, OUT]) observeOnce(once *sync.Once, obs Observable, failure bool) {
once.Do(func() {
if b.breaker == nil {
return
}

obs.Observe(failure)
})
}

// internalCancellation is used to distinguish between internal and external (to the lib) context cancellations.
var internalCancellation = errors.New("internal cancellation")

func (b *Circuit[IN, OUT]) observeCtxOnce(once *sync.Once, obs Observable, ctx context.Context) {
// observeCtx observes the given context for cancellation and records it as a failure.
// It assumes [Observable.Observe] is idempotent and deduplicates calls itself.
func (b *Circuit[IN, OUT]) observeCtx(obs Observable, ctx context.Context) {
// We want to observe a context error as soon as possible to open the breaker, but at the same time we want to
// keep the call to the wrapped function synchronous to avoid all pitfalls that come with asynchronicity.
<-ctx.Done()
Expand All @@ -121,7 +108,7 @@ func (b *Circuit[IN, OUT]) observeCtxOnce(once *sync.Once, obs Observable, ctx c
if context.Cause(ctx) == internalCancellation {
err = nil // ignore internal cancellations; the wrapped function returned already
}
b.observeOnce(once, obs, b.options.isFailure(err))
obs.Observe(b.options.isFailure(err))
}

// defaultFailureCondition is the default failure condition used by [NewCircuit].
Expand Down
6 changes: 5 additions & 1 deletion hoglet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hoglet_test
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -99,10 +100,13 @@ func (mt *mockBreaker) Call() hoglet.Observable {

type mockObservable struct {
breaker *mockBreaker
once sync.Once
}

func (mo *mockObservable) Observe(failure bool) {
mo.breaker.open = failure
mo.once.Do(func() {
mo.breaker.open = failure
})
}

func TestHoglet_Do(t *testing.T) {
Expand Down

0 comments on commit 3ea98b5

Please sign in to comment.