Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Event Hooks To Athens #1664

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package actions

import (
"context"
"fmt"
"net/http"
"net/url"
"path"
"strings"
"time"

"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/download"
"github.com/gomods/athens/pkg/download/addons"
"github.com/gomods/athens/pkg/download/mode"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/events"
"github.com/gomods/athens/pkg/index"
"github.com/gomods/athens/pkg/index/mem"
"github.com/gomods/athens/pkg/index/mysql"
Expand Down Expand Up @@ -107,7 +111,11 @@ func addProxyRoutes(
if err != nil {
return err
}
st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight)
withEventsHook, err := getEventHook(c)
if err != nil {
return err
}
st := stash.New(mf, s, indexer, stash.WithPool(c.GoGetWorkers), withSingleFlight, withEventsHook)

df, err := mode.NewFile(c.DownloadMode, c.DownloadURL)
if err != nil {
Expand All @@ -129,6 +137,23 @@ func addProxyRoutes(
return nil
}

func getEventHook(c *config.Config) (stash.Wrapper, error) {
const op errors.Op = "actions.getEventHook"
if c.EventsHook == "" {
return func(s stash.Stasher) stash.Stasher {
return s
}, nil
}
eh := events.NewClient(c.EventsHook, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err := eh.Ping(ctx)
if err != nil {
return nil, errors.E(op, err)
}
return stash.WithEventsHook(eh), nil
}

func getSingleFlight(c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType {
case "", "memory":
Expand Down
9 changes: 9 additions & 0 deletions config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ ForceSSL = false
# Env override: ATHENS_PROXY_VALIDATOR
ValidatorHook = ""

# EventsHook specifies a URL that will receive POST request events
# throughout an Athens request lifecycle.
#
# To see what type of events you will get and what the payload looks like
# check the pkg/event in this repository.
#
# Env override: ATHENS_EVENTS_HOOK
EventsHook = ""

# PathPrefix specifies whether the Proxy
# should have a basepath. Certain proxies and services
# are distinguished based on subdomain, while others are based
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
PropagateAuthHost string `envconfig:"ATHENS_PROPAGATE_AUTH_HOST"`
ForceSSL bool `envconfig:"PROXY_FORCE_SSL"`
ValidatorHook string `envconfig:"ATHENS_PROXY_VALIDATOR"`
EventsHook string `envconfig:"ATHENS_EVENTS_HOOK"`
PathPrefix string `envconfig:"ATHENS_PATH_PREFIX"`
NETRCPath string `envconfig:"ATHENS_NETRC_PATH"`
GithubToken string `envconfig:"ATHENS_GITHUB_TOKEN"`
Expand Down
80 changes: 80 additions & 0 deletions pkg/events/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package events

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/gomods/athens/pkg/build"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/requestid"
)

// NewClient returns a new http service
func NewClient(url string, c *http.Client) Hook {
if c == nil {
c = http.DefaultClient
}
return &service{url, c}
}

type service struct {
url string
c *http.Client
}

func (s *service) Ping(ctx context.Context) error {
const op errors.Op = "events.Ping"
return s.sendEvent(ctx, op, Ping, PingEvent{BaseEvent: BaseEvent{
Event: Ping.String(),
Version: build.Data().Version,
}})
}

func (s *service) Stashed(ctx context.Context, mod, ver string) error {
const op errors.Op = "events.Stashed"
return s.sendEvent(ctx, op, Stashed, StashedEvent{
BaseEvent: BaseEvent{
Event: Stashed.String(),
Version: build.Data().Version,
},
Module: mod,
Version: ver,
})
}

func (s *service) sendEvent(ctx context.Context, op errors.Op, event Type, payload interface{}) error {
req, err := s.getRequest(ctx, event, payload)
if err != nil {
return errors.E(op, err)
}
resp, err := s.c.Do(req)
if err != nil {
return errors.E(op, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return errors.E(op, fmt.Errorf("event backend returned non-200 code: %d - body: %s", resp.StatusCode, body))
}
return nil
}

func (s *service) getRequest(ctx context.Context, event Type, payload interface{}) (*http.Request, error) {
const op errors.Op = "events.getRequest"
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(payload)
if err != nil {
return nil, errors.E(op, err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, &buf)
if err != nil {
return nil, errors.E(op, err)
}
req.Header.Set(HeaderKey, event.String())
req.Header.Set(requestid.HeaderKey, requestid.FromContext(ctx))
return req, nil
}
50 changes: 50 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package events

import (
"context"
)

//go:generate stringer -type=Type

// Type describe various event types
type Type int

// HeaderKey is the HTTP Header that Athens will send
// along every event. This helps you know which JSON shape
// to use when parsing a request body
const HeaderKey = "Athens-Event"

// Event types
const (
Ping Type = iota + 1
Stashed
)

// BaseEvent is the common data that all
// event payloads are composed of.
type BaseEvent struct {
Event string
Version string
}

// PingEvent describes the payload for a Ping event
type PingEvent struct {
BaseEvent
}

// StashedEvent describes the payload for the Stashed event
type StashedEvent struct {
BaseEvent
Module, Version string
}

// Hook describes a service that can be used to send events to
type Hook interface {
// Ping pings the underlying server to ensure that
// the event hook url is ready to receive requests
Ping(ctx context.Context) error

// Stashed is called whenever a new module is succesfully persisted
// to the storage Backend
Stashed(ctx context.Context, mod, ver string) error
}
121 changes: 121 additions & 0 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package events

import (
"context"
"fmt"
"net/http/httptest"
"testing"

"github.com/gomods/athens/pkg/requestid"
"github.com/stretchr/testify/require"
"github.com/technosophos/moniker"
)

var pingTests = []struct {
name string
err error
}{
{
name: "ping",
},
{
name: "ping_err",
err: fmt.Errorf("could not ping"),
},
}

func TestClientServerPing(t *testing.T) {
for _, tc := range pingTests {
t.Run(tc.name, func(t *testing.T) {
hook := &mockHook{err: tc.err}
srv := httptest.NewServer(NewServer(hook))
t.Cleanup(srv.Close)
client := NewClient(srv.URL, nil)
err := client.Ping(context.Background())
checkErr(t, tc.err != nil, err)
})
}
}

var stashedTests = []struct {
name string
mod string
ver string
err error
}{
{
name: "happy path",
mod: "github.com/gomods/athens",
ver: "v0.10.0",
},
{
name: "stashed error",
mod: "mod",
ver: "ver",
err: fmt.Errorf("server error"),
},
}

func TestClientServerStashed(t *testing.T) {
for _, tc := range stashedTests {
t.Run(tc.name, func(t *testing.T) {
hook := &mockHook{err: tc.err}
srv := httptest.NewServer(NewServer(hook))
t.Cleanup(srv.Close)
client := NewClient(srv.URL, nil)
err := client.Stashed(context.Background(), "github.com/gomods/athens", "v0.10.0")
if checkErr(t, tc.err != nil, err) {
return
}
if tc.mod != hook.mod {
t.Fatalf("expected module to be %q but got %q", tc.mod, hook.mod)
}
if tc.ver != hook.ver {
t.Fatalf("expected version to be %q but got %q", tc.ver, hook.ver)
}
})
}
}

func TestRequestIDPropagation(t *testing.T) {
hook := &mockHook{}
srv := httptest.NewServer(NewServer(hook))
t.Cleanup(srv.Close)
client := NewClient(srv.URL, nil)
reqID := moniker.New().Name()
ctx := requestid.SetInContext(context.Background(), reqID)
err := client.Ping(ctx)
if err != nil {
t.Fatal(err)
}
if reqID != hook.reqid {
t.Fatalf("expected request id to be %q but got %q", reqID, hook.reqid)
}
}

type mockHook struct {
mod, ver string
reqid string
err error
}

func (mh *mockHook) Ping(ctx context.Context) error {
mh.reqid = requestid.FromContext(ctx)
return mh.err
}

func (mh *mockHook) Stashed(ctx context.Context, mod, ver string) error {
mh.mod, mh.ver = mod, ver
return mh.err
}

func checkErr(t *testing.T, wantErr bool, err error) bool {
if wantErr {
if err == nil {
t.Fatal("expected an error but got nil")
}
return true
}
require.NoError(t, err)
return false
}
45 changes: 45 additions & 0 deletions pkg/events/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package events

import (
"encoding/json"
"fmt"
"net/http"

"github.com/gomods/athens/pkg/requestid"
)

// NewServer returns an http.Handler that parses
func NewServer(h Hook) http.Handler {
return &server{h}
}

type server struct {
h Hook
}

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}
ctx := r.Context()
ctx = requestid.SetInContext(ctx, r.Header.Get(requestid.HeaderKey))
var err error
switch event := r.Header.Get(HeaderKey); event {
case Ping.String():
err = s.h.Ping(ctx)
case Stashed.String():
var body StashedEvent
err = json.NewDecoder(r.Body).Decode(&body)
if err != nil {
break
}
err = s.h.Stashed(ctx, body.Module, body.Version)
default:
err = fmt.Errorf("unknown event: %q", event)
}
if err != nil {
http.Error(w, err.Error(), 500)
return
}
}
Loading