Skip to content
This repository has been archived by the owner on May 13, 2022. It is now read-only.

Commit

Permalink
Added a new limit controller for requests per minute (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanpalmer authored and KS Chan committed Feb 28, 2018
1 parent 2ae6bfa commit af836a9
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
61 changes: 61 additions & 0 deletions cuttle/limitcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,64 @@ func (c *RPSControl) Acquire() bool {

return true
}

// RPMControl provides requests per minute rate limit control.
type RPMControl struct {
// Label of this control.
Label string
// Rate holds the number of requests per minute.
Rate int

pendingChan chan uint
readyChan chan uint
seen *list.List
}

// NewRPMControl return a new RPMControl with the given label and rate.
func NewRPMControl(label string, rate int) *RPMControl {
return &RPMControl{label, rate, make(chan uint), make(chan uint), list.New()}
}

// Start running RPMControl.
// A goroutine is launched to govern the rate limit of Acquire().
func (c *RPMControl) Start() {
go func() {
log.Debugf("RPMControl[%s]: Activated.", c.Label)

for {
<-c.pendingChan

log.Debugf("RPMControl[%s]: Limited at %dreq/m.", c.Label, c.Rate)
if c.seen.Len() == c.Rate {
front := c.seen.Front()
nanoElapsed := time.Now().UnixNano() - front.Value.(int64)
milliElapsed := nanoElapsed / int64(time.Millisecond)
secondElapsed := milliElapsed / 1000
log.Debugf("RPMControl[%s]: Elapsed %ds since first request.", c.Label, secondElapsed)

if waitTime := 60 - secondElapsed; waitTime > 0 {
log.Infof("RPMControl[%s]: Waiting for %ds.", c.Label, waitTime)
time.Sleep(time.Duration(waitTime) * time.Second)
}

c.seen.Remove(front)
}
c.seen.PushBack(time.Now().UnixNano())

c.readyChan <- 1
}

log.Debugf("RPMControl[%s]: Deactivated.", c.Label)
}()
}

// Acquire permission from RPMControl.
// Permission is granted at a rate of N requests per minute.
func (c *RPMControl) Acquire() bool {
log.Debugf("RPMControl[%s]: Seeking permission.", c.Label)
c.pendingChan <- 1
<-c.readyChan
log.Debugf("RPMControl[%s]: Granted permission.", c.Label)

return true
}
73 changes: 73 additions & 0 deletions cuttle/limitcontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,76 @@ func TestRPSControl(t *testing.T) {
t.Errorf("4x RPSControl.Acquire() elapsed %dms, want > %dms", elapsed, 1500)
}
}

func TestRPMControl(t *testing.T) {
var control LimitController
var startT, endT int64

control = NewRPMControl("label", 2)
control.Start()

acquired := true

startT = time.Now().UnixNano()
acquired = acquired && control.Acquire() // Expect no wait time.
acquired = acquired && control.Acquire() // Expect no wait time.
endT = time.Now().UnixNano()

// Expecting acquired is true
if acquired != true {
t.Errorf("Permission cannot be acquired from RPMControl.Acquire()")
}

// Expecting no delay in 2 consecutive Acquire() with Rate=2.
if elapsed := (endT - startT) / 1000; elapsed > 1000 {
t.Errorf("2x RPMControl.Acquire() elapsed %dms, want %dms", elapsed, 0)
}

control = NewRPMControl("label", 30)
control.Start()

acquired = true

startT = time.Now().UnixNano()
acquired = acquired && control.Acquire() // Expect no wait time.
time.Sleep(time.Duration(15) * time.Second)
acquired = acquired && control.Acquire() // Expect no wait time.
time.Sleep(time.Duration(30) * time.Second)
acquired = acquired && control.Acquire() // Expect 30s wait time.
endT = time.Now().UnixNano()

// Expecting acquired is true
if acquired != true {
t.Errorf("Permission cannot be acquired from RPMControl.Acquire()")
}

// Expecting delay in 30 consecutive Acquire() with Rate=30.
if elapsed := (endT - startT) / 1000; elapsed < 60 {
t.Errorf("3x RPMControl.Acquire() elapsed %dms, want > %dms", elapsed, 60)
}

control = NewRPSControl("label", 30)
control.Start()

acquired = true

startT = time.Now().UnixNano()
acquired = acquired && control.Acquire() // Expect no wait time.
time.Sleep(time.Duration(20) * time.Second)
acquired = acquired && control.Acquire() // Expect no wait time.
time.Sleep(time.Duration(30) * time.Second)
acquired = acquired && control.Acquire() // Expect 30s wait time.
time.Sleep(time.Duration(60) * time.Second)
acquired = acquired && control.Acquire() // Expect 30s wait time.
endT = time.Now().UnixNano()

// Expecting acquired is true
if acquired != true {
t.Errorf("Permission cannot be acquired from RPSControl.Acquire()")
}

// Expecting delay in 4 consecutive Acquire() with Rate=2.
if elapsed := (endT - startT) / 1000; elapsed < 120 {
t.Errorf("4x RPSControl.Acquire() elapsed %dms, want > %dms", elapsed, 120)
}
}
2 changes: 2 additions & 0 deletions cuttle/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (z *Zone) GetController(host string, path string) LimitController {
switch z.Control {
case "rps":
controller = NewRPSControl(key, z.Rate)
case "rpm":
controller = NewRPMControl(key, z.Rate)
case "noop":
controller = NewNoopControl(key)
case "ban":
Expand Down

0 comments on commit af836a9

Please sign in to comment.