Skip to content

Commit

Permalink
Add structured logging for downloads
Browse files Browse the repository at this point in the history
This adds structured logging fields to the download function, allowing
us to see which URLs and which transfer jobs are associated with a given
log line.

With the structured information, one can correlate all the messages from
a given transfer or job with a simple 'grep'.
  • Loading branch information
bbockelm committed Sep 8, 2024
1 parent 869284c commit 035e389
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,14 @@ var (
},
)

stoppedTransferDebugLine sync.Once

PelicanError error_codes.PelicanError
)

type (
logFields string

classAd string

cacheItem struct {
Expand Down Expand Up @@ -641,7 +645,7 @@ func (te *TransferEngine) newPelicanURL(remoteUrl *url.URL) (pelicanURL pelicanU
}
} else if scheme == "" {
// If we don't have a url scheme, then our metadata information should be in the config
log.Debugln("No url scheme detected, getting metadata information from configuration")
log.Debugln("No url scheme detected for", remoteUrl.String(), ", getting metadata information from configuration")
if fedInfo, err := config.GetFederation(te.ctx); err == nil {
pelicanURL.directorUrl = fedInfo.DirectorEndpoint
} else {
Expand Down Expand Up @@ -1735,9 +1739,14 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
transferEndpointUrl := *transferEndpoint.Url
transferEndpointUrl.Path = transfer.remoteURL.Path
transferEndpoint.Url = &transferEndpointUrl
fields := log.Fields{
"url": transferEndpoint.Url.String(),
"job": transfer.job.ID(),
}
ctx := context.WithValue(transfer.ctx, logFields("fields"), fields)
transferStartTime = time.Now() // Update start time for this attempt
attemptDownloaded, timeToFirstByte, cacheAge, serverVersion, err := downloadHTTP(
transfer.ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, transfer.token, transfer.project,
ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, transfer.token, transfer.project,
)
endTime := time.Now()
if cacheAge >= 0 {
Expand All @@ -1751,7 +1760,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
downloaded += attemptDownloaded

if err != nil {
log.Debugln("Failed to download from", transferEndpoint.Url, ":", err)
log.WithFields(fields).Debugln("Failed to download from", transferEndpoint.Url, ":", err)
var ope *net.OpError
var cse *ConnectionSetupError
proxyStr, _ := os.LookupEnv("http_proxy")
Expand Down Expand Up @@ -1790,7 +1799,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
transferResults.Attempts = append(transferResults.Attempts, attempt)

if err == nil { // Success
log.Debugln("Downloaded bytes:", downloaded)
log.WithFields(fields).Debugln("Downloaded bytes:", downloaded)
success = true
break
}
Expand Down Expand Up @@ -1821,9 +1830,13 @@ func parseTransferStatus(status string) (int, string) {
//
// Returns the downloaded size, time to 1st byte downloaded, serverVersion and an error if there is one
func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCallbackFunc, transfer transferAttemptDetails, dest string, totalSize int64, token string, project string) (downloaded int64, timeToFirstByte time.Duration, cacheAge time.Duration, serverVersion string, err error) {
fields, ok := ctx.Value(logFields("fields")).(log.Fields)
if !ok {
fields = log.Fields{}
}
defer func() {
if r := recover(); r != nil {
log.Errorln("Panic occurred in downloadHTTP:", r)
log.WithFields(fields).Errorln("Panic occurred in downloadHTTP:", r)
ret := fmt.Sprintf("Unrecoverable error (panic) occurred in downloadHTTP: %v", r)
err = errors.New(ret)
}
Expand Down Expand Up @@ -1883,8 +1896,8 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall

ctx, cancel := context.WithCancel(ctx)
defer cancel()
log.Debugln("Attempting to download from:", transferUrl.Host)
log.Debugln("Transfer URL String:", transferUrl.String())
log.WithFields(fields).Debugln("Attempting to download from:", transferUrl.Host)
log.WithFields(fields).Debugln("Transfer URL String:", transferUrl.String())
var req *grab.Request
var unpacker *autoUnpacker
if transfer.PackOption != "" {
Expand Down Expand Up @@ -1934,7 +1947,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
downloadLimit := param.Client_MinimumDownloadSpeed.GetInt()

// Start the transfer
log.Debugln("Starting the HTTP transfer...")
log.WithFields(fields).Debugln("Starting the HTTP transfer...")
downloadStart := time.Now()
resp := client.Do(req)
// Check the error real quick
Expand All @@ -1945,7 +1958,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
if errors.Is(err, grab.ErrBadLength) {
err = fmt.Errorf("local copy of file is larger than remote copy %w", grab.ErrBadLength)
} else if errors.As(err, &sce) {
log.Debugln("Creating a client status code error")
log.WithFields(fields).Debugln("Creating a client status code error")
sce2 := StatusCodeError(sce)
err = &sce2
} else if errors.As(err, &cam) && cam == syscall.ENOMEM {
Expand All @@ -1954,7 +1967,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
} else {
err = &ConnectionSetupError{Err: err}
}
log.Errorln("Failed to download:", err)
log.WithFields(fields).Errorln("Failed to download:", err)
return
}
}
Expand All @@ -1964,13 +1977,13 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
if ageSec, err := strconv.Atoi(ageStr); err == nil {
cacheAge = time.Duration(ageSec) * time.Second
} else {
log.Debugf("Server at %s gave unparseable Age header (%s) in response: %s", transfer.Url.Host, ageStr, err.Error())
log.WithFields(fields).Debugf("Server at %s gave unparseable Age header (%s) in response: %s", transfer.Url.Host, ageStr, err.Error())
}
}
if cacheAge == 0 {
log.Debugln("Server at", transfer.Url.Host, "had a cache miss")
log.WithFields(fields).Debugln("Server at", transfer.Url.Host, "had a cache miss")
} else if cacheAge > 0 {
log.Debugln("Server at", transfer.Url.Host, "had a cache hit with data age", cacheAge.String())
log.WithFields(fields).Debugln("Server at", transfer.Url.Host, "had a cache hit with data age", cacheAge.String())
}

// Size of the download
Expand All @@ -1985,7 +1998,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
var headResponse *http.Response
headResponse, err = headClient.Do(headRequest)
if err != nil {
log.Errorln("Could not successfully get response for HEAD request")
log.WithFields(fields).Errorln("Could not successfully get response for HEAD request")
err = errors.Wrap(err, "Could not determine the size of the remote object")
return
}
Expand All @@ -1994,7 +2007,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
if contentLengthStr != "" {
totalSize, err = strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
log.Errorln("problem converting content-length to an int:", err)
log.WithFields(fields).Errorln("problem converting content-length to an int:", err)
totalSize = 0
}
}
Expand All @@ -2006,8 +2019,10 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
stoppedTransferTimeout := compatToDuration(param.Client_StoppedTransferTimeout.GetDuration(), "Client.StoppedTranferTimeout")
slowTransferRampupTime := compatToDuration(param.Client_SlowTransferRampupTime.GetDuration(), "Client.SlowTransferRampupTime")
slowTransferWindow := compatToDuration(param.Client_SlowTransferWindow.GetDuration(), "Client.SlowTransferWindow")
log.Debugf("Stopped transfer timeout is %s; slow transfer ramp-up is %s; slow transfer look-back window is %s",
stoppedTransferTimeout.String(), slowTransferRampupTime.String(), slowTransferWindow.String())
stoppedTransferDebugLine.Do(func() {
log.WithFields(fields).Debugf("Stopped transfer timeout is %s; slow transfer ramp-up is %s; slow transfer look-back window is %s",
stoppedTransferTimeout.String(), slowTransferRampupTime.String(), slowTransferWindow.String())
})
startBelowLimit := time.Time{}
var noProgressStartTime time.Time
var lastBytesComplete int64
Expand Down Expand Up @@ -2044,7 +2059,7 @@ Loop:
StoppedTime: time.Since(noProgressStartTime),
CacheHit: cacheAge > 0,
}
log.Errorln(err.Error())
log.WithFields(fields).Errorln(err.Error())
return
}
} else {
Expand All @@ -2069,7 +2084,7 @@ Loop:
warning := []byte("Warning! Downloading too slow...\n")
status, err := getProgressContainer().Write(warning)
if err != nil {
log.Errorln("Problem displaying slow message", err, status)
log.WithFields(fields).Errorln("Problem displaying slow message", err, status)
continue
}
startBelowLimit = time.Now()
Expand All @@ -2081,7 +2096,7 @@ Loop:
// The download is below the threshold for more than `SlowTransferWindow` seconds, cancel the download
cancel()

log.Errorf("Cancelled: Download speed of %s/s is below the limit of %s/s", ByteCountSI(int64(resp.BytesPerSecond())), ByteCountSI(int64(downloadLimit)))
log.WithFields(fields).Errorf("Cancelled: Download speed of %s/s is below the limit of %s/s", ByteCountSI(int64(resp.BytesPerSecond())), ByteCountSI(int64(downloadLimit)))

err = &SlowTransferError{
BytesTransferred: resp.BytesComplete(),
Expand Down Expand Up @@ -2112,15 +2127,15 @@ Loop:
err = &ConnectionSetupError{URL: resp.Request.URL().String()}
return
}
log.Debugln("Got error from HTTP download", err)
log.WithFields(fields).Debugln("Got error from HTTP download", err)
return
} else {
// Check the trailers for any error information
trailer := resp.HTTPResponse.Trailer
if errorStatus := trailer.Get("X-Transfer-Status"); errorStatus != "" {
statusCode, statusText := parseTransferStatus(errorStatus)
if statusCode != 200 {
log.Debugln("Got error from file transfer")
log.WithFields(fields).Debugln("Got error from file transfer")
err = errors.New("transfer error: " + statusText)
return
}
Expand All @@ -2129,7 +2144,7 @@ Loop:
// Valid responses include 200 and 206. The latter occurs if the download was resumed after a
// prior attempt.
if resp.HTTPResponse.StatusCode != 200 && resp.HTTPResponse.StatusCode != 206 {
log.Debugln("Got failure status code:", resp.HTTPResponse.StatusCode)
log.WithFields(fields).Debugln("Got failure status code:", resp.HTTPResponse.StatusCode)
return 0, 0, -1, serverVersion, &HttpErrResp{resp.HTTPResponse.StatusCode, fmt.Sprintf("Request failed (HTTP status %d): %s",
resp.HTTPResponse.StatusCode, resp.Err().Error())}
}
Expand All @@ -2141,7 +2156,7 @@ Loop:
}
}

log.Debugln("HTTP Transfer was successful")
log.WithFields(fields).Debugln("HTTP Transfer was successful")
return
}

Expand Down

0 comments on commit 035e389

Please sign in to comment.