Skip to content

Commit

Permalink
Merge branch 'main' of github.com:go-vela/worker into feat/queue-signing
Browse files Browse the repository at this point in the history
  • Loading branch information
plyr4 committed Aug 8, 2023
2 parents 65a4d8a + 054b67d commit 2a744ec
Show file tree
Hide file tree
Showing 19 changed files with 329 additions and 140 deletions.
1 change: 1 addition & 0 deletions .github/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ Copyright (c) 2022 Target Brands, Inc.
```

[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0)

31 changes: 25 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ release/
*.iws
*.xml

# VSCode project folder
.vscode/

# VSCode project files
__debug_bin

# Secrets environment file
secrets.env

Expand All @@ -52,3 +46,28 @@ secrets.env
.DS_Store

api-spec.json

# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode

### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets

# Local History for Visual Studio Code
.history/

# Built Visual Studio Code Extensions
*.vsix
__debug_bin

### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide

# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# Use of this source code is governed by the LICENSE file in this repository.

FROM alpine as certs
FROM alpine:3.18.2@sha256:25fad2a32ad1f6f510e528448ae1ec69a28ef81916a004d3629874104f8a7f70 as certs

RUN apk add --update --no-cache ca-certificates

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile-alpine
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# Use of this source code is governed by the LICENSE file in this repository.

FROM alpine
FROM alpine:3.18.2@sha256:25fad2a32ad1f6f510e528448ae1ec69a28ef81916a004d3629874104f8a7f70

RUN apk add --update --no-cache ca-certificates

Expand Down
90 changes: 88 additions & 2 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@ package main
import (
"context"
"net/http"
"strconv"
"sync"
"time"

"github.com/go-vela/types"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/go-vela/worker/executor"
"github.com/go-vela/worker/runtime"
"github.com/go-vela/worker/version"

"github.com/sirupsen/logrus"
)

// exec is a helper function to poll the queue
// and execute Vela pipelines for the Worker.
//
//nolint:nilerr,funlen // ignore returning nil - don't want to crash worker
func (w *Worker) exec(index int) error {
func (w *Worker) exec(index int, config *library.Worker) error {
var err error

// setup the version
Expand Down Expand Up @@ -70,6 +73,48 @@ func (w *Worker) exec(index int) error {
"version": v.Semantic(),
})

// lock and append the build to the RunningBuildIDs list
w.RunningBuildIDsMutex.Lock()

w.RunningBuildIDs = append(w.RunningBuildIDs, strconv.Itoa(item.Build.GetNumber()))

config.SetRunningBuildIDs(w.RunningBuildIDs)

w.RunningBuildIDsMutex.Unlock()

// set worker status
updateStatus := w.getWorkerStatusFromConfig(config)
config.SetStatus(updateStatus)
config.SetLastStatusUpdateAt(time.Now().Unix())
config.SetLastBuildStartedAt(time.Now().Unix())

// update worker in the database
_, _, err = w.VelaClient.Worker.Update(config.GetHostname(), config)
if err != nil {
logger.Errorf("unable to update worker: %v", err)
}

// handle stale item queued before a Vela upgrade or downgrade.
if item.ItemVersion != types.ItemVersion {
// If the ItemVersion is older or newer than what we expect, then it might
// not be safe to process the build. Fail the build and loop to the next item.
// TODO: Ask the server to re-compile and requeue the build instead of failing it.
logrus.Errorf("Failing stale queued build due to wrong item version: want %d, got %d", types.ItemVersion, item.ItemVersion)

build := item.Build
build.SetError("Unable to process stale build (queued before Vela upgrade/downgrade).")
build.SetStatus(constants.StatusError)
build.SetFinished(time.Now().UTC().Unix())

_, _, err := w.VelaClient.Build.Update(item.Repo.GetOrg(), item.Repo.GetName(), build)
if err != nil {
logrus.Errorf("Unable to set build status to %s: %s", constants.StatusFailure, err)
return err
}

return nil
}

// setup the runtime
//
// https://pkg.go.dev/github.com/go-vela/worker/runtime?tab=doc#New
Expand Down Expand Up @@ -132,6 +177,32 @@ func (w *Worker) exec(index int) error {
}

logger.Info("completed build")

// lock and remove the build from the RunningBuildIDs list
w.RunningBuildIDsMutex.Lock()

for i, v := range w.RunningBuildIDs {
if v == strconv.Itoa(item.Build.GetNumber()) {
w.RunningBuildIDs = append(w.RunningBuildIDs[:i], w.RunningBuildIDs[i+1:]...)
}
}

config.SetRunningBuildIDs(w.RunningBuildIDs)

w.RunningBuildIDsMutex.Unlock()

// set worker status
updateStatus := w.getWorkerStatusFromConfig(config)
config.SetStatus(updateStatus)
config.SetLastStatusUpdateAt(time.Now().Unix())
config.SetLastBuildFinishedAt(time.Now().Unix())

// update worker in the database
_, _, err := w.VelaClient.Worker.Update(config.GetHostname(), config)
if err != nil {
logger.Errorf("unable to update worker: %v", err)
}

}()

// capture the configured build timeout
Expand Down Expand Up @@ -200,3 +271,18 @@ func (w *Worker) exec(index int) error {

return nil
}

// getWorkerStatusFromConfig is a helper function
// to determine the appropriate worker status
func (w *Worker) getWorkerStatusFromConfig(config *library.Worker) string {
switch rb := len(config.GetRunningBuildIDs()); {
case rb == 0:
return constants.WorkerStatusIdle
case rb < w.Config.Build.Limit:
return constants.WorkerStatusAvailable
case rb == w.Config.Build.Limit:
return constants.WorkerStatusBusy
default:
return constants.WorkerStatusError
}
}
28 changes: 26 additions & 2 deletions cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-vela/server/queue"
"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -118,6 +119,18 @@ func (w *Worker) operate(ctx context.Context) error {
//nolint:contextcheck // ignore passing context
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)
if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("status update response is nil")
}
if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}
return err
}

Expand Down Expand Up @@ -160,13 +173,24 @@ func (w *Worker) operate(ctx context.Context) error {
// (do not pass the context to avoid errors in one
// executor+build inadvertently canceling other builds)
//nolint:contextcheck // ignore passing context
err = w.exec(id)
err = w.exec(id, registryWorker)
if err != nil {
// log the error received from the executor
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Errorf
logrus.Errorf("failing worker executor: %v", err)

registryWorker.SetStatus(constants.WorkerStatusError)
_, resp, logErr := w.VelaClient.Worker.Update(registryWorker.GetHostname(), registryWorker)
if resp == nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Error("status update response is nil")
}
if logErr != nil {
if resp != nil {
// log the error instead of returning so the operation doesn't block worker deployment
logrus.Errorf("status code: %v, unable to update worker %s status with the server: %v", resp.StatusCode, registryWorker.GetHostname(), logErr)
}
}
return err
}
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/vela-worker/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/http"

"github.com/go-vela/types/constants"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -46,12 +47,16 @@ func (w *Worker) checkIn(config *library.Worker) (bool, string, error) {
func (w *Worker) register(config *library.Worker) (bool, string, error) {
logrus.Infof("worker %s not found, registering it with the server", config.GetHostname())

config.SetStatus(constants.WorkerStatusIdle)

tkn, _, err := w.VelaClient.Worker.Add(config)
if err != nil {
// log the error instead of returning so the operation doesn't block worker deployment
return false, "", fmt.Errorf("unable to register worker %s with the server: %w", config.GetHostname(), err)
}

logrus.Infof("worker %q status updated successfully to %s", config.GetHostname(), config.GetStatus())

// successfully added the worker so return nil
return true, tkn.GetToken(), nil
}
2 changes: 2 additions & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func run(c *cli.Context) error {
Executors: make(map[int]executor.Engine),

RegisterToken: make(chan string, 1),

RunningBuildIDs: make([]string, 0),
}

// set the worker address if no flag was provided
Expand Down
17 changes: 10 additions & 7 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"net/url"
"sync"
"time"

"github.com/go-vela/sdk-go/vela"
Expand Down Expand Up @@ -62,12 +63,14 @@ type (
// Worker represents all configuration and
// system processes for the worker.
Worker struct {
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
Config *Config
Executors map[int]executor.Engine
Queue queue.Service
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
CheckedIn bool
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
}
)
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ services:
#
# https://www.vaultproject.io/
vault:
image: vault:latest
image: hashicorp/vault:latest
container_name: vault
command: server -dev
networks:
Expand Down
2 changes: 1 addition & 1 deletion executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
// send API call to update the logs for the step
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateStep
_log, _, err = c.Vela.Log.UpdateStep(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), c.init.Number, _log)
_, err = c.Vela.Log.UpdateStep(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), c.init.Number, _log)
if err != nil {
c.Logger.Errorf("unable to upload %s logs: %v", c.init.Name, err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error {
// send API call to update the logs for the service
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateService
_log, _, err = s.client.Vela.Log.UpdateStep(s.client.repo.GetOrg(), s.client.repo.GetName(), s.client.build.GetNumber(), ctn.Number, _log)
_, err = s.client.Vela.Log.UpdateStep(s.client.repo.GetOrg(), s.client.repo.GetName(), s.client.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Errorf("unable to upload container logs: %v", err)
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error {
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateStep
//nolint:contextcheck // ignore passing context
_log, _, err = s.client.Vela.Log.UpdateStep(s.client.repo.GetOrg(), s.client.repo.GetName(), s.client.build.GetNumber(), s.client.init.Number, _log)
_, err = s.client.Vela.Log.UpdateStep(s.client.repo.GetOrg(), s.client.repo.GetName(), s.client.build.GetNumber(), s.client.init.Number, _log)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateService
//nolint:contextcheck // ignore passing context
_, _, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
_, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Errorf("unable to upload container logs: %v", err)
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
// send API call to append the logs for the service
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateService
_log, _, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
_, err = c.Vela.Log.UpdateService(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Error(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/linux/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogService.UpdateStep
//nolint:contextcheck // ignore passing context
_, _, err = c.Vela.Log.UpdateStep(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
_, err = c.Vela.Log.UpdateStep(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Errorf("unable to upload container logs: %v", err)
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error
// send API call to append the logs for the step
//
// https://pkg.go.dev/github.com/go-vela/sdk-go/vela?tab=doc#LogStep.UpdateStep
_log, _, err = c.Vela.Log.UpdateStep(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
_, err = c.Vela.Log.UpdateStep(c.repo.GetOrg(), c.repo.GetName(), c.build.GetNumber(), ctn.Number, _log)
if err != nil {
logger.Error(err)
}
Expand Down
Loading

0 comments on commit 2a744ec

Please sign in to comment.