Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prometheus extension #6

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
/go.work*
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Simple low-overhead circuit breaker library.
## Usage

```go
h := hoglet.NewCircuit(
h, err := hoglet.NewCircuit(
func(ctx context.Context, bar int) (Foo, error) {
if bar == 42 {
return Foo{Bar: bar}, nil
Expand All @@ -19,6 +19,8 @@ h := hoglet.NewCircuit(
hoglet.NewSlidingWindowBreaker(10, 0.1),
hoglet.WithFailureCondition(hoglet.IgnoreContextCancelation),
)
/* if err != nil ... */

f, _ := h.Call(context.Background(), 42)
fmt.Println(f.Bar) // 42

Expand Down
90 changes: 29 additions & 61 deletions breaker.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
package hoglet

import (
"context"
"math"
"sync/atomic"
"time"
)

// untypedCircuit is used to avoid type annotations when implementing breakers.
type untypedCircuit interface {
stateForCall() State
setOpenedAt(int64)
}
// stateChange encodes what the circuit should do after observing a call.
type stateChange int

const (
// stateChangeNone means the circuit should keep its current state.
stateChangeNone stateChange = iota
// stateChangeOpen means the circuit should open.
stateChangeOpen
// stateChangeClose means the circuit should close.
stateChangeClose
)

// observer is used to observe the result of a single wrapped call through the circuit breaker.
type observer interface {
// observe is called after the wrapped function returns. If [Circuit.Do] returns a non-nil [observer], it will be
// Observer is used to observe the result of a single wrapped call through the circuit breaker.
// Calls in an open circuit cause no observer to be created.
type Observer interface {
// Observe is called after the wrapped function returns. If [Circuit.Do] returns a non-nil [Observer], it will be
// called exactly once.
observe(failure bool)
Observe(failure bool)
}

// observableCall tracks a single call through the breaker.
// It should be instantiated via [newObservableCall] to ensure the observer is only called once.
type observableCall func(bool)
// ObserverFunc is a helper to turn any function into an [Observer].
type ObserverFunc func(bool)

func (o observableCall) observe(failure bool) {
func (o ObserverFunc) Observe(failure bool) {
o(failure)
}

Expand All @@ -34,7 +39,6 @@ func (o observableCall) observe(failure bool) {
type EWMABreaker struct {
decay float64
threshold float64
circuit untypedCircuit

// State
failureRate atomic.Value
Expand Down Expand Up @@ -67,31 +71,14 @@ func NewEWMABreaker(sampleCount uint, failureThreshold float64) *EWMABreaker {
return e
}

func (e *EWMABreaker) connect(c untypedCircuit) {
e.circuit = c
}

func (e *EWMABreaker) observerForCall(_ context.Context) (observer, error) {
state := e.circuit.stateForCall()

if state == StateOpen {
return nil, ErrCircuitOpen
}

return observableCall(func(failure bool) {
e.observe(state == StateHalfOpen, failure)
}), nil
}

func (e *EWMABreaker) observe(halfOpen, failure bool) {
func (e *EWMABreaker) observe(halfOpen, failure bool) stateChange {
if e.threshold == 0 {
return
return stateChangeNone
}

if !failure && halfOpen {
e.circuit.setOpenedAt(0)
e.failureRate.Store(e.threshold)
return
return stateChangeClose
}

var value float64 = 0.0
Expand All @@ -101,25 +88,24 @@ func (e *EWMABreaker) observe(halfOpen, failure bool) {

// Unconditionally setting via swap and maybe overwriting is faster in the initial case.
failureRate, _ := e.failureRate.Swap(value).(float64)
if failureRate == 0 {
if failureRate == math.SmallestNonzeroFloat64 {
failureRate = value
} else {
failureRate = (value * e.decay) + (failureRate * (1 - e.decay))
e.failureRate.Store(failureRate)
}

if failureRate > e.threshold {
e.circuit.setOpenedAt(time.Now().UnixMicro())
return stateChangeOpen
} else {
e.circuit.setOpenedAt(0)
return stateChangeClose
}
}

// SlidingWindowBreaker is a [Breaker] that uses a sliding window to determine the error rate.
type SlidingWindowBreaker struct {
windowSize time.Duration
threshold float64
circuit untypedCircuit

// State

Expand Down Expand Up @@ -151,23 +137,7 @@ func NewSlidingWindowBreaker(windowSize time.Duration, failureThreshold float64)
return s
}

func (s *SlidingWindowBreaker) connect(c untypedCircuit) {
s.circuit = c
}

func (s *SlidingWindowBreaker) observerForCall(_ context.Context) (observer, error) {
state := s.circuit.stateForCall()

if state == StateOpen {
return nil, ErrCircuitOpen
}

return observableCall(func(failure bool) {
s.observe(state == StateHalfOpen, failure)
}), nil
}

func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) stateChange {
var (
lastFailureCount int64
lastSuccessCount int64
Expand All @@ -176,16 +146,14 @@ func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
)

if !failure && halfOpen {
s.circuit.setOpenedAt(0)
return
return stateChangeClose
}

// The second condition ensures only one goroutine can swap the windows. Necessary since multiple swaps would
// overwrite the last counts to some near zero value.
if currentStartMicros := s.currentStart.Load(); sinceMicros(currentStartMicros) > s.windowSize && s.currentStart.CompareAndSwap(currentStartMicros, time.Now().UnixMicro()) {
lastFailureCount = s.lastFailureCount.Swap(s.currentFailureCount.Swap(0))
lastSuccessCount = s.lastSuccessCount.Swap(s.currentSuccessCount.Swap(0))
s.circuit.setOpenedAt(0)
} else {
lastFailureCount = s.lastFailureCount.Load()
lastSuccessCount = s.lastSuccessCount.Load()
Expand All @@ -208,9 +176,9 @@ func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
failureRate := weightedFailures / weightedTotal

if failureRate > s.threshold {
s.circuit.setOpenedAt(time.Now().UnixMicro())
return stateChangeOpen
} else {
s.circuit.setOpenedAt(0)
return stateChangeClose
}
}

Expand Down
Loading