Skip to content

Commit

Permalink
feat: make kurtosis service logs faster (#2525)
Browse files Browse the repository at this point in the history
## Description
Users were experiencing `kurtosis service logs` taking a long time.
After running tests, I discovered that a majority of execution time
during log processing was spent in the following lines:
```
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
      serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
```
Prior to this change, we were sending logs one at a time on an
unbuffered channel - unbuffered channels block until the receiving
goroutine reads the value. This was causing a lot of time being wasted
waiting to send log lines across the channel.

This change implements a `LogLineSender` that:
1. uses a buffered go channel (won't block on sending line unless buffer
is full)
2. batches log lines (reduces read overhead, receiving goroutine
performs fewer reads/sends)

With this change, the time to read 20 minutes of `cl-lighthouse-geth`
logs with log level set to debug went from `1min53sec` to `30.055`
seconds. The time to read 2 hours 10 minutes worth of `cl-lighthouse`
debug logs (around 3.4 gb of logs) went from `15min1sec` to `3min31`
sec. (As a benchmark, `cat logs.json` on `3.4 gb` of logs takes around
`2min` - on my machine - so much closer) This can likely be improved
further by tuning the buffer size and batch amount.

## Is this change user facing?
YES

## References:

https://discord.com/channels/783719264308953108/1267837033032974467/1267842228072611881
  • Loading branch information
tedim52 authored Aug 12, 2024
1 parent 79f5256 commit d6b246a
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 52 deletions.
13 changes: 1 addition & 12 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ version: 2.1

orbs:
npm-publisher: uraway/[email protected]
kurtosis-docs-checker: kurtosis-tech/[email protected].7
kurtosis-docs-checker: kurtosis-tech/[email protected].9
slack: circleci/[email protected]

executors:
Expand Down Expand Up @@ -1475,17 +1475,6 @@ workflows:
name: "Check if CLI builds for all os and arch pairs"
<<: *filters_ignore_main

- test_enclave_manager_web_ui:
name: "Test Basic Web UI Functionality in Docker"
context:
- docker-user
requires:
- build_cli
- build_api_container_server
- build_engine_server
- build_files_artifacts_expander
<<: *filters_ignore_main

- test_basic_cli_functionality:
name: "Test Basic CLI Functionality in Docker"
cli-cluster-backend: "docker"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Because of this additional layer of abstraction, we are able to introduce severa

How do I get going?
===================
To see Kurtosis in action, first install it using the instructions [here](https://docs.kurtosis.com/install) or visit [Kurtosis Cloud](https://cloud.kurtosis.com/) to provision a remote host.
To see Kurtosis in action, first install it using the instructions [here](https://docs.kurtosis.com/install).

Then, run the [Redis voting app Kurtosis package](https://github.com/kurtosis-tech/awesome-kurtosis/tree/main/redis-voting-app):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(
streamErrChan := make(chan error)

// this channel will return the user service log lines by service UUID
logsByKurtosisUserServiceUuidChan := make(chan map[service.ServiceUUID][]logline.LogLine)
logLineSender := logline.NewLogLineSender()
logsByKurtosisUserServiceUuidChan := logLineSender.GetLogsChannel()

wgSenders := &sync.WaitGroup{}
for serviceUuid := range userServiceUuids {
wgSenders.Add(oneSenderAdded)
go client.streamServiceLogLines(
ctx,
wgSenders,
logsByKurtosisUserServiceUuidChan,
logLineSender,
streamErrChan,
enclaveUuid,
serviceUuid,
Expand All @@ -87,7 +88,11 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs(
//wait for stream go routine to end
wgSenders.Wait()

close(logsByKurtosisUserServiceUuidChan)
// send all buffered log lines
logLineSender.Flush()

// wait until the channel has been fully read/empty before closing it
closeChannelWhenEmpty(logsByKurtosisUserServiceUuidChan)
close(streamErrChan)

//then cancel the context
Expand Down Expand Up @@ -130,7 +135,7 @@ func (client *persistentVolumeLogsDatabaseClient) FilterExistingServiceUuids(
func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
ctx context.Context,
wgSenders *sync.WaitGroup,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand All @@ -143,7 +148,7 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
client.streamStrategy.StreamLogs(
ctx,
client.filesystem,
logsByKurtosisUserServiceUuidChan,
logLineSender,
streamErrChan,
enclaveUuid,
serviceUuid,
Expand All @@ -152,3 +157,12 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines(
shouldReturnAllLogs,
numLogLines)
}

func closeChannelWhenEmpty(logsChan chan map[service.ServiceUUID][]logline.LogLine) {
for {
if len(logsChan) == 0 {
close(logsChan)
return
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type JsonLog map[string]string
func (strategy *PerFileStreamLogsStrategy) StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand Down Expand Up @@ -122,12 +122,7 @@ func (strategy *PerFileStreamLogsStrategy) StreamLogs(
break
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
logLineSender.Send(serviceUuid, *logLine)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewPerWeekStreamLogsStrategy(time logs_clock.LogsClock, logRetentionPeriodI
func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand Down Expand Up @@ -89,24 +89,26 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
}()

if shouldReturnAllLogs {
if err := strategy.streamAllLogs(ctx, logsReader, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err := strategy.streamAllLogs(ctx, logsReader, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming all logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
return
}
} else {
if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err := strategy.streamTailLogs(ctx, logsReader, numLogLines, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred streaming '%v' logs for service '%v' in enclave '%v'", numLogLines, serviceUuid, enclaveUuid)
return
}
}

// need to flush before following logs
logLineSender.Flush()
if shouldFollowLogs {
latestLogFile := paths[len(paths)-1]
if err := strategy.followLogs(ctx, latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
logrus.Debugf("Following logs...")
if err := strategy.followLogs(ctx, latestLogFile, logLineSender, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred creating following logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
return
}
logrus.Debugf("Following logs...")
}
}

Expand Down Expand Up @@ -180,7 +182,7 @@ func getLogsReader(filesystem volume_filesystem.VolumeFilesystem, logFilePaths [
func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
ctx context.Context,
logsReader *bufio.Reader,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
for {
Expand All @@ -190,12 +192,14 @@ func (strategy *PerWeekStreamLogsStrategy) streamAllLogs(
return nil
default:
jsonLogStr, err := getCompleteJsonLogString(logsReader)

if isValidJsonEnding(jsonLogStr) {
jsonLog, err := convertStringToJson(jsonLogStr)
if err != nil {
return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr)
}
if err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {

if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil {
return err
}
}
Expand All @@ -217,7 +221,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamTailLogs(
ctx context.Context,
logsReader *bufio.Reader,
numLogLines uint32,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
tailLogLines := make([]string, 0, numLogLines)
Expand Down Expand Up @@ -255,7 +259,7 @@ func (strategy *PerWeekStreamLogsStrategy) streamTailLogs(
if err != nil {
return stacktrace.Propagate(err, "An error occurred converting the json log string '%v' into json.", jsonLogStr)
}
if err := strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil {
return err
}
}
Expand Down Expand Up @@ -298,11 +302,7 @@ func isValidJsonEnding(line string) bool {
return endOfLine == volume_consts.EndOfJsonLine
}

func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
jsonLog JsonLog,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(jsonLog JsonLog, conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, logLineSender *logline.LogLineSender, serviceUuid service.ServiceUUID) error {
// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}
Expand Down Expand Up @@ -338,12 +338,7 @@ func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
return nil
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
logLineSender.Send(serviceUuid, *logLine)
return nil
}

Expand All @@ -358,7 +353,7 @@ func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine *logl
func (strategy *PerWeekStreamLogsStrategy) followLogs(
ctx context.Context,
filepath string,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex,
) error {
Expand Down Expand Up @@ -399,8 +394,7 @@ func (strategy *PerWeekStreamLogsStrategy) followLogs(
// if tail package fails to parse a valid new line, fail fast
return stacktrace.NewError("hpcloud/tail returned the following line: '%v' that was not valid json.\nThis is potentially a bug in tailing package.", logLine.Text)
}
err = strategy.sendJsonLogLine(jsonLog, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
if err = strategy.sendJsonLogLine(jsonLog, conjunctiveLogLinesFiltersWithRegex, logLineSender, serviceUuid); err != nil {
return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type StreamLogsStrategy interface {
StreamLogs(
ctx context.Context,
fs volume_filesystem.VolumeFilesystem,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
logLineSender *logline.LogLineSender,
streamErrChan chan error,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
Expand Down
65 changes: 65 additions & 0 deletions engine/server/engine/centralized_logs/logline/logline_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package logline

import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"sync"
)

const (
batchLogsAmount = 500
logsChanBufferSize = 300
)

type LogLineSender struct {
logsChan chan map[service.ServiceUUID][]LogLine

logLineBuffer map[service.ServiceUUID][]LogLine

mu sync.Mutex
}

func NewLogLineSender() *LogLineSender {
return &LogLineSender{
logsChan: make(chan map[service.ServiceUUID][]LogLine, logsChanBufferSize),
logLineBuffer: map[service.ServiceUUID][]LogLine{},
mu: sync.Mutex{},
}
}

func (sender *LogLineSender) Send(serviceUuid service.ServiceUUID, logLine LogLine) {
sender.mu.Lock()
defer sender.mu.Unlock()

sender.logLineBuffer[serviceUuid] = append(sender.logLineBuffer[serviceUuid], logLine)

if len(sender.logLineBuffer[serviceUuid])%batchLogsAmount == 0 {
userServicesLogLinesMap := map[service.ServiceUUID][]LogLine{
serviceUuid: sender.logLineBuffer[serviceUuid],
}
sender.logsChan <- userServicesLogLinesMap

// clear buffer after flushing it through the channel
sender.logLineBuffer[serviceUuid] = []LogLine{}
}
}

func (sender *LogLineSender) GetLogsChannel() chan map[service.ServiceUUID][]LogLine {
return sender.logsChan
}

// sends all logs remaining in the buffers through the channel
// this should be called at the end of processing to send the remainder of logs
func (sender *LogLineSender) Flush() {
sender.mu.Lock()
defer sender.mu.Unlock()

for uuid, logLines := range sender.logLineBuffer {
serviceUuid := uuid
userServiceLogLinesMap := map[service.ServiceUUID][]LogLine{
serviceUuid: logLines,
}
sender.logsChan <- userServiceLogLinesMap

sender.logLineBuffer[serviceUuid] = []LogLine{}
}
}
2 changes: 1 addition & 1 deletion engine/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/kurtosis-tech/kurtosis/grpc-file-transfer/golang v0.0.0-20230803130419-099ee7a4e3dc
github.com/kurtosis-tech/kurtosis/metrics-library/golang v0.0.0-20231206095907-9bdf0d02cb90
github.com/labstack/echo/v4 v4.11.3
github.com/rs/cors v1.9.0
github.com/rs/cors v1.11.0
github.com/spf13/afero v1.10.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
k8s.io/apimachinery v0.27.2
Expand Down
4 changes: 2 additions & 2 deletions engine/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE=
github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po=
github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/segmentio/backo-go v1.0.0 h1:kbOAtGJY2DqOR0jfRkYEorx/b18RgtepGtY3+Cpe6qA=
github.com/segmentio/backo-go v1.0.0/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down

0 comments on commit d6b246a

Please sign in to comment.