Skip to content

Commit

Permalink
recover from panics during batch sub-requests (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
pirtleshell authored Feb 22, 2024
1 parent ef48922 commit bdf0d57
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
27 changes: 20 additions & 7 deletions service/batchmdw/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"net/http"
"sync"

"github.com/kava-labs/kava-proxy-service/logging"
"github.com/kava-labs/kava-proxy-service/service/cachemdw"
)

// BatchProcessor makes multiple requests to the underlying handler and then combines all the
// responses into a single response.
// It assumes all individual responses are valid json. Each response is then marshaled into an array.
type BatchProcessor struct {
*logging.ServiceLogger

handler http.HandlerFunc
requests []*http.Request
responses []*bytes.Buffer
Expand All @@ -23,14 +26,15 @@ type BatchProcessor struct {
}

// NewBatchProcessor creates a BatchProcessor for combining the responses of reqs to the handler
func NewBatchProcessor(handler http.HandlerFunc, reqs []*http.Request) *BatchProcessor {
func NewBatchProcessor(serviceLogger *logging.ServiceLogger, handler http.HandlerFunc, reqs []*http.Request) *BatchProcessor {
return &BatchProcessor{
handler: handler,
requests: reqs,
responses: make([]*bytes.Buffer, len(reqs)),
header: nil,
status: http.StatusOK,
mu: sync.Mutex{},
ServiceLogger: serviceLogger,
handler: handler,
requests: reqs,
responses: make([]*bytes.Buffer, len(reqs)),
header: nil,
status: http.StatusOK,
mu: sync.Mutex{},
}
}

Expand All @@ -42,6 +46,15 @@ func (bp *BatchProcessor) RequestAndServe(w http.ResponseWriter) error {
wg.Add(1)

go func(idx int, req *http.Request) {
// if a client closes the connection prematurely, it can cause panics from batch sub-requests.
// when that happens, log & discard the panic.
// https://github.com/golang/go/issues/28239
defer func() {
if recover() != nil {
wg.Done()
bp.Error().Int("sub-request index", idx).Msg("sub-request for batch panicked")
}
}()

buf := new(bytes.Buffer)
frw := newFakeResponseWriter(buf, bp.setErrStatus)
Expand Down
6 changes: 4 additions & 2 deletions service/batchmdw/batchmdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ func CreateBatchProcessingMiddleware(
// build request as if it's the only one being requested.
req, err := http.NewRequestWithContext(singleRequestContext, r.Method, r.URL.String(), bytes.NewBuffer(body))
if err != nil {
panic(fmt.Sprintf("failed build sub-request: %s", err))
config.ServiceLogger.Error().Err(err).Any("req", single).Msg("failed to sub-request of batch")
continue
}
req.Host = r.Host
req.Header = r.Header
req.Close = true

reqs = append(reqs, req)
}

// process all requests and respond with results in an array
batchProcessor := NewBatchProcessor(singleRequestHandler, reqs)
batchProcessor := NewBatchProcessor(config.ServiceLogger, singleRequestHandler, reqs)
batchProcessor.RequestAndServe(w)
}
}
2 changes: 1 addition & 1 deletion service/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (hsp HeightShardingProxies) ProxyForRequest(r *http.Request) (*httputil.Rev
_, _, found := hsp.pruningProxies.ProxyForRequest(r)
// if the host isn't in the pruning proxies, short circuit fallback to default
if !found {
hsp.Debug().Msg(fmt.Sprintf("no pruning host backend configured for %s", r.Host))
hsp.Trace().Msg(fmt.Sprintf("no pruning host backend configured for %s", r.Host))
return hsp.defaultProxies.ProxyForRequest(r)
}

Expand Down

0 comments on commit bdf0d57

Please sign in to comment.