Skip to content

Commit

Permalink
Revert "region: Reimplement processRPCs batching loop" (#233)
Browse files Browse the repository at this point in the history
This reverts commit 086c5c9.
  • Loading branch information
dethi authored Aug 25, 2023
1 parent 086c5c9 commit 701886e
Showing 1 changed file with 54 additions and 48 deletions.
102 changes: 54 additions & 48 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func (c *client) unregisterRPC(id uint32) hrpc.Call {
}

func (c *client) processRPCs() {
// TODO: flush when the size is too large
// TODO: if multi has only one call, send that call instead
m := newMulti(c.rpcQueueSize)
defer func() {
Expand Down Expand Up @@ -365,62 +366,67 @@ func (c *client) processRPCs() {
m = newMulti(c.rpcQueueSize)
}

var t *time.Timer
if c.flushInterval > 0 {
// Initialize timer and then stop it. It will be Reset later
t = time.NewTimer(0)
if !t.Stop() {
<-t.C
for {
// first loop is to accomodate request heavy workload
// it will batch as long as conccurent writers are sending
// new rpcs or until multi is filled up
for {
select {
case <-c.done:
return
case rpcs := <-c.rpcs:
// have things queued up, batch them
if !m.add(rpcs) {
// can still put more rpcs into batch
continue
}
default:
// no more rpcs queued up
}
break
}
}

batchloop:
for {
// wait for first rpc
select {
case <-c.done:
return
case rpcs := <-c.rpcs:
if m.add(rpcs) {
flush("queue full")
continue
if l := m.len(); l == 0 {
// wait for the next batch
select {
case <-c.done:
return
case rpcs := <-c.rpcs:
m.add(rpcs)
}
continue
} else if l >= c.rpcQueueSize || c.flushInterval == 0 {
// batch is full, flush
flush("queue full")
continue
}
if c.flushInterval > 0 {
// Add rpcs to batch until full or timer runs out
t.Reset(c.flushInterval)
for {
select {
case <-c.done:
return
case rpcs := <-c.rpcs:
if m.add(rpcs) {
if !t.Stop() {
<-t.C
}
flush("queue full")
continue batchloop
}
case <-t.C:
flush("timeout")
continue batchloop

// second loop is to accomodate less frequent callers
// that would like to maximize their batches at the expense
// of waiting for flushInteval
timer := time.NewTimer(c.flushInterval)
reason := ""
for {
select {
case <-c.done:
return
case <-timer.C:
reason = "timeout"
// time to flush
case rpcs := <-c.rpcs:
if !m.add(rpcs) {
// can still put more rpcs into batch
continue
}
}
} else {
// Add rpcs to batch until chan is no longer readable
for {
select {
case rpcs := <-c.rpcs:
if m.add(rpcs) {
flush("queue full")
continue batchloop
}
default:
flush("timeout")
continue batchloop
reason = "queue full"
// batch is full
if !timer.Stop() {
<-timer.C
}
}
break
}
flush(reason)
}
}

Expand Down

0 comments on commit 701886e

Please sign in to comment.