diff --git a/cuttle/limitcontrol.go b/cuttle/limitcontrol.go index 1674bea..c0c4a57 100644 --- a/cuttle/limitcontrol.go +++ b/cuttle/limitcontrol.go @@ -125,3 +125,64 @@ func (c *RPSControl) Acquire() bool { return true } + +// RPMControl provides requests per minute rate limit control. +type RPMControl struct { + // Label of this control. + Label string + // Rate holds the number of requests per minute. + Rate int + + pendingChan chan uint + readyChan chan uint + seen *list.List +} + +// NewRPMControl return a new RPMControl with the given label and rate. +func NewRPMControl(label string, rate int) *RPMControl { + return &RPMControl{label, rate, make(chan uint), make(chan uint), list.New()} +} + +// Start running RPMControl. +// A goroutine is launched to govern the rate limit of Acquire(). +func (c *RPMControl) Start() { + go func() { + log.Debugf("RPMControl[%s]: Activated.", c.Label) + + for { + <-c.pendingChan + + log.Debugf("RPMControl[%s]: Limited at %dreq/m.", c.Label, c.Rate) + if c.seen.Len() == c.Rate { + front := c.seen.Front() + nanoElapsed := time.Now().UnixNano() - front.Value.(int64) + milliElapsed := nanoElapsed / int64(time.Millisecond) + secondElapsed := milliElapsed / 1000 + log.Debugf("RPMControl[%s]: Elapsed %ds since first request.", c.Label, secondElapsed) + + if waitTime := 60 - secondElapsed; waitTime > 0 { + log.Infof("RPMControl[%s]: Waiting for %ds.", c.Label, waitTime) + time.Sleep(time.Duration(waitTime) * time.Second) + } + + c.seen.Remove(front) + } + c.seen.PushBack(time.Now().UnixNano()) + + c.readyChan <- 1 + } + + log.Debugf("RPMControl[%s]: Deactivated.", c.Label) + }() +} + +// Acquire permission from RPMControl. +// Permission is granted at a rate of N requests per minute. +func (c *RPMControl) Acquire() bool { + log.Debugf("RPMControl[%s]: Seeking permission.", c.Label) + c.pendingChan <- 1 + <-c.readyChan + log.Debugf("RPMControl[%s]: Granted permission.", c.Label) + + return true +} diff --git a/cuttle/limitcontrol_test.go b/cuttle/limitcontrol_test.go index 849e4f5..b164343 100644 --- a/cuttle/limitcontrol_test.go +++ b/cuttle/limitcontrol_test.go @@ -116,3 +116,76 @@ func TestRPSControl(t *testing.T) { t.Errorf("4x RPSControl.Acquire() elapsed %dms, want > %dms", elapsed, 1500) } } + +func TestRPMControl(t *testing.T) { + var control LimitController + var startT, endT int64 + + control = NewRPMControl("label", 2) + control.Start() + + acquired := true + + startT = time.Now().UnixNano() + acquired = acquired && control.Acquire() // Expect no wait time. + acquired = acquired && control.Acquire() // Expect no wait time. + endT = time.Now().UnixNano() + + // Expecting acquired is true + if acquired != true { + t.Errorf("Permission cannot be acquired from RPMControl.Acquire()") + } + + // Expecting no delay in 2 consecutive Acquire() with Rate=2. + if elapsed := (endT - startT) / 1000; elapsed > 1000 { + t.Errorf("2x RPMControl.Acquire() elapsed %dms, want %dms", elapsed, 0) + } + + control = NewRPMControl("label", 30) + control.Start() + + acquired = true + + startT = time.Now().UnixNano() + acquired = acquired && control.Acquire() // Expect no wait time. + time.Sleep(time.Duration(15) * time.Second) + acquired = acquired && control.Acquire() // Expect no wait time. + time.Sleep(time.Duration(30) * time.Second) + acquired = acquired && control.Acquire() // Expect 30s wait time. + endT = time.Now().UnixNano() + + // Expecting acquired is true + if acquired != true { + t.Errorf("Permission cannot be acquired from RPMControl.Acquire()") + } + + // Expecting delay in 30 consecutive Acquire() with Rate=30. + if elapsed := (endT - startT) / 1000; elapsed < 60 { + t.Errorf("3x RPMControl.Acquire() elapsed %dms, want > %dms", elapsed, 60) + } + + control = NewRPSControl("label", 30) + control.Start() + + acquired = true + + startT = time.Now().UnixNano() + acquired = acquired && control.Acquire() // Expect no wait time. + time.Sleep(time.Duration(20) * time.Second) + acquired = acquired && control.Acquire() // Expect no wait time. + time.Sleep(time.Duration(30) * time.Second) + acquired = acquired && control.Acquire() // Expect 30s wait time. + time.Sleep(time.Duration(60) * time.Second) + acquired = acquired && control.Acquire() // Expect 30s wait time. + endT = time.Now().UnixNano() + + // Expecting acquired is true + if acquired != true { + t.Errorf("Permission cannot be acquired from RPSControl.Acquire()") + } + + // Expecting delay in 4 consecutive Acquire() with Rate=2. + if elapsed := (endT - startT) / 1000; elapsed < 120 { + t.Errorf("4x RPSControl.Acquire() elapsed %dms, want > %dms", elapsed, 120) + } +} diff --git a/cuttle/zone.go b/cuttle/zone.go index d22c903..9c2c77c 100644 --- a/cuttle/zone.go +++ b/cuttle/zone.go @@ -100,6 +100,8 @@ func (z *Zone) GetController(host string, path string) LimitController { switch z.Control { case "rps": controller = NewRPSControl(key, z.Rate) + case "rpm": + controller = NewRPMControl(key, z.Rate) case "noop": controller = NewNoopControl(key) case "ban":