Skip to content

Commit

Permalink
support for in flight pubkey
Browse files Browse the repository at this point in the history
  • Loading branch information
timhuynh94 committed Aug 25, 2023
1 parent f270db7 commit 620618c
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 3 deletions.
85 changes: 85 additions & 0 deletions api/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package api

import (
"encoding/base64"
"net/http"

"github.com/gin-gonic/gin"
"github.com/go-vela/worker/router/middleware/token"
)

// swagger:operation POST /register system Register
//
// Fill registration token channel in worker to continue operation
//
// ---
// produces:
// - application/json
// parameters:
// security:
// - ApiKeyAuth: []
// responses:
// '200':
// description: Successfully passed token to worker
// schema:
// type: string
// '401':
// description: No token was passed
// schema:
// "$ref": "#/definitions/Error"
// '500':
// description: Unable to pass token to worker
// schema:
// "$ref": "#/definitions/Error"

// QueueKey will pass the token given in the request header to the register token
// channel of the worker. This will unblock operation if the worker has not been
// registered and the provided registration token is valid.
func QueueKey(c *gin.Context) { // extract the register token channel that was packed into gin context
v, ok := c.Get("queue-signing-key")
if !ok {
c.JSON(http.StatusInternalServerError, "no queue signing key channel in the context")
return
}

// make sure we configured the channel properly
rChan, ok := v.(chan string)
if !ok {
c.JSON(http.StatusInternalServerError, "queue signing key channel in the context is the wrong type")
return
}

// if token is present in the channel, deny registration
// this will likely never happen as the channel is offloaded immediately
if len(rChan) > 0 {
c.JSON(http.StatusOK, "queue key already provided")
return
}

// retrieve auth token from header
t, err := token.Retrieve(c.Request)
if err != nil {
// an error occurs when no token was passed
c.JSON(http.StatusUnauthorized, err)
return
}

publicKeyDecoded, err := base64.StdEncoding.DecodeString(t)
if err != nil {
c.JSON(http.StatusBadRequest, "Bad public key was provided")
return
}

if len(publicKeyDecoded) == 0 {
c.JSON(http.StatusBadRequest, "Provided public key is empty")
return
}
// write registration token to auth token channel
rChan <- t

c.JSON(http.StatusOK, "successfully passed public key to worker")
}
2 changes: 0 additions & 2 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ func (w *Worker) exec(index int, config *library.Worker) error {

// setup the version
v := version.New()

// capture an item from the queue
item, err := w.Queue.Pop(context.Background())
if err != nil {
return err
}

if item == nil {
return nil
}
Expand Down
22 changes: 21 additions & 1 deletion cmd/vela-worker/operate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"context"
"errors"
"time"

"github.com/go-vela/server/queue"
Expand Down Expand Up @@ -113,10 +114,14 @@ func (w *Worker) operate(ctx context.Context) error {
}
})

// if no pubkey was embedded or provided on startup, wait here
logrus.Trace("wait for public key before setup queue")
w.Config.Queue.EncodedSigningPublicKey = <-w.QueueSigningKey //nolint:wsl
logrus.Trace("received public key.. setting up queue") //nolint:wsl
// setup the queue
//
// https://pkg.go.dev/github.com/go-vela/server/queue?tab=doc#New
//nolint:contextcheck // ignore passing context
//nolint:nolintlint,contextcheck // ignore passing context
w.Queue, err = queue.New(w.Config.Queue)
if err != nil {
registryWorker.SetStatus(constants.WorkerStatusError)
Expand Down Expand Up @@ -173,8 +178,23 @@ func (w *Worker) operate(ctx context.Context) error {
// (do not pass the context to avoid errors in one
// executor+build inadvertently canceling other builds)
//nolint:contextcheck // ignore passing context

err = w.exec(id, registryWorker)
if err != nil {
// if invalid key is provided, wait for a new one
if err.Error() == errors.New("unable to open signed item").Error() ||
err.Error() == errors.New("no valid signing public key provided").Error() {
// pull public key from configuration if provided; wait if not
logrus.Trace("waiting for queue signing public key")

qPubKey := <-w.QueueSigningKey

logrus.Trace("received queue signing public key")
// set Queue public key
w.Config.Queue.EncodedSigningPublicKey = qPubKey
w.Queue, _ = queue.New(w.Config.Queue)
continue
}
// log the error received from the executor
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Errorf
Expand Down
9 changes: 9 additions & 0 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func run(c *cli.Context) error {

RegisterToken: make(chan string, 1),

QueueSigningKey: make(chan string, 1),

RunningBuildIDs: make([]string, 0),
}

Expand All @@ -155,6 +157,13 @@ func run(c *cli.Context) error {
w.RegisterToken <- c.String("server.secret")
}

// if queue signing key is provided, use as queue key on start up
if len(c.String("queue.signing.public-key")) > 0 {
logrus.Trace("unlocking queue with embedded queue signing key")

w.QueueSigningKey <- c.String("queue.signing.public-key")
}

// validate the worker
err = w.Validate()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (w *Worker) server() (http.Handler, *tls.Config) {
middleware.Executors(w.Executors),
middleware.Logger(logrus.StandardLogger(), time.RFC3339, true),
middleware.RegisterToken(w.RegisterToken),
middleware.QueueSigningKey(w.QueueSigningKey),
)

// log a message indicating the start of serving traffic
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type (
Runtime runtime.Engine
VelaClient *vela.Client
RegisterToken chan string
QueueSigningKey chan string
CheckedIn bool
RunningBuildIDs []string
RunningBuildIDsMutex sync.Mutex
Expand Down
18 changes: 18 additions & 0 deletions router/middleware/queue_signing_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package middleware

import (
"github.com/gin-gonic/gin"
)

// QueueSigningKey is a middleware function that attaches the
// auth-token channel to the context of every http.Request.
func QueueSigningKey(r chan string) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set("queue-signing-key", r)
c.Next()
}
}
48 changes: 48 additions & 0 deletions router/middleware/queue_signing_key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2023 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package middleware

import (
"net/http"
"net/http/httptest"
"reflect"
"testing"

"github.com/gin-gonic/gin"
)

func TestMiddleware_QueueSigningKey(t *testing.T) {
// setup types
want := make(chan string, 1)
got := make(chan string, 1)

want <- "foo"

// setup context
gin.SetMode(gin.TestMode)

resp := httptest.NewRecorder()
context, engine := gin.CreateTestContext(resp)
context.Request, _ = http.NewRequest(http.MethodGet, "/health", nil)

// setup mock server
engine.Use(QueueSigningKey(want))
engine.GET("/health", func(c *gin.Context) {
got = c.Value("queue-signing-key").(chan string)

c.Status(http.StatusOK)
})

// run test
engine.ServeHTTP(context.Writer, context.Request)

if resp.Code != http.StatusOK {
t.Errorf("QueueSigningKey returned %v, want %v", resp.Code, http.StatusOK)
}

if !reflect.DeepEqual(got, want) {
t.Errorf("QueueSigningKey is %v, want foo", got)
}
}
3 changes: 3 additions & 0 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,8 @@ func Load(options ...gin.HandlerFunc) *gin.Engine {
// endpoint for passing a new registration token to the deadloop running operate.go
r.POST("/register", api.Register)

// endpoint for passing a new queue signing key to the deadloop running operate.go
r.POST("/queue-key", api.QueueKey)

return r
}
6 changes: 6 additions & 0 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func TestRouter_Load(t *testing.T) {
Handler: "github.com/go-vela/worker/api.Register",
HandlerFunc: api.Register,
},
{
Method: "POST",
Path: "/api/v1/queue-key",
Handler: "github.com/go-vela/worker/api.QueueKey",
HandlerFunc: api.QueueKey,
},
{
Method: "GET",
Path: "/api/v1/executors",
Expand Down

0 comments on commit 620618c

Please sign in to comment.