diff --git a/CHANGES.md b/CHANGES.md index fdaa5a0..5c543da 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,10 +1,18 @@ +## v0.0.2 + +* Introduce `terminate-after` flag, which instructs the process to shutdown + after a specified number of requests. +* Introduce graceful shutdown. Upon receiving a shutdown message via SIGTERM, or + via `terminate-after`, call shutdown on each server, allowing requests to + drain. + ## v0.0.1 bb 0.0.1 is the first public release of bb * This release supports HTTP 1.1 and gRPC. * Available strategies are: broadcast channel, point-to-point channel, terminus, and HTTP egress -* Allows users tio define a percentage of requests that should fail and a duration to wait for +* Allows users tio define a percentage of requests that should fail and a duration to wait for before processing requests. -* This release has been tested locally on Mac OS and on both Google Kubernetes Engine and +* This release has been tested locally on Mac OS and on both Google Kubernetes Engine and Minikube. diff --git a/README.md b/README.md index 22c1cb7..025acd1 100644 --- a/README.md +++ b/README.md @@ -108,7 +108,8 @@ A test run using the Docker CLI should return usage information and confirm that --id string identifier for this container --log-level string log level, must be one of: panic, fatal, error, warn, info, debug (default "debug") --percent-failure int percentage of requests that this service will automatically fail - --sleep-in-millis int amount of milliseconds to wait before actually start processing as request + --sleep-in-millis int amount of milliseconds to wait before actually start processing a request + --terminate-after int terminate the process after this many requests Use "bb [command] --help" for more information about a command. diff --git a/cmd/root.go b/cmd/root.go index 4a186b7..79cfc28 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -47,7 +47,8 @@ func init() { RootCmd.PersistentFlags().IntVar(&config.GRPCServerPort, "grpc-server-port", -1, "port to bind a gRPC server to") RootCmd.PersistentFlags().IntVar(&config.H1ServerPort, "h1-server-port", -1, "port to bind a HTTP 1.1 server to") RootCmd.PersistentFlags().IntVar(&config.PercentageFailedRequests, "percent-failure", 0, "percentage of requests that this service will automatically fail") - RootCmd.PersistentFlags().IntVar(&config.SleepInMillis, "sleep-in-millis", 0, "amount of milliseconds to wait before actually start processing as request") + RootCmd.PersistentFlags().IntVar(&config.SleepInMillis, "sleep-in-millis", 0, "amount of milliseconds to wait before actually start processing a request") + RootCmd.PersistentFlags().IntVar(&config.TerminateAfter, "terminate-after", 0, "terminate the process after this many requests") RootCmd.PersistentFlags().BoolVar(&config.FireAndForget, "fire-and-forget", false, "do not wait for a response when contacting downstream services.") RootCmd.PersistentFlags().StringSliceVar(&config.GRPCDownstreamServers, "grpc-downstream-server", []string{}, "list of servers (hostname:port) to send messages to using gRPC, can be repeated") RootCmd.PersistentFlags().StringSliceVar(&config.H1DownstreamServers, "h1-downstream-server", []string{}, "list of servers (protocol://hostname:port) to send messages to using HTTP 1.1, can be repeated") diff --git a/cmd/services.go b/cmd/services.go index ccfc3dd..4473b3b 100644 --- a/cmd/services.go +++ b/cmd/services.go @@ -62,9 +62,7 @@ func buildClients(config *service.Config) ([]service.Client, error) { func newService(config *service.Config, strategyName string) (*service.Service, error) { - handler := &service.RequestHandler{ - Config: config, - } + handler := service.NewRequestHandler(config) servers, err := buildServers(config, handler) if err != nil { @@ -95,7 +93,18 @@ func newService(config *service.Config, strategyName string) (*service.Service, stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) log.Infof("Service [%s] is ready and waiting for incoming connections", config.ID) - <-stop + + select { + case <-stop: + log.Infof("Stopping service [%s] due to interrupt", config.ID) + case <-handler.Stopping(): + log.Infof("Stopping service [%s] due to handler", config.ID) + } + + for _, server := range servers { + server.Shutdown() + } + return service, nil } diff --git a/protocols/grpc.go b/protocols/grpc.go index c2a5283..6d7afa1 100644 --- a/protocols/grpc.go +++ b/protocols/grpc.go @@ -12,6 +12,7 @@ import ( ) type theGrpcServer struct { + grpcServer *grpc.Server port int serviceHandler *service.RequestHandler } @@ -20,6 +21,12 @@ func (s *theGrpcServer) GetID() string { return fmt.Sprintf("grpc-%d", s.port) } +func (s *theGrpcServer) Shutdown() error { + log.Infof("Shutting down [%s]", s.GetID()) + s.grpcServer.GracefulStop() + return nil +} + func (s *theGrpcServer) TheFunction(ctx context.Context, req *pb.TheRequest) (*pb.TheResponse, error) { resp, err := s.serviceHandler.Handle(ctx, req) log.Infof("Received gRPC request [%s] [%s] Returning response [%+v]", req.RequestUID, req, resp) @@ -59,6 +66,7 @@ func NewGrpcServerIfConfigured(config *service.Config, serviceHandler *service.R grpcServer := grpc.NewServer() theGrpcServer := &theGrpcServer{ + grpcServer: grpcServer, port: grpcServerPort, serviceHandler: serviceHandler, } diff --git a/protocols/grpc_test.go b/protocols/grpc_test.go index 5aa4cf3..d08faf5 100644 --- a/protocols/grpc_test.go +++ b/protocols/grpc_test.go @@ -23,7 +23,9 @@ func TestTheGrpcServer(t *testing.T) { theResponseToReturn: expectedProtoResponse, } - grpcServer := theGrpcServer{serviceHandler: &service.RequestHandler{Config: &service.Config{}, Strategy: strategy}} + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + grpcServer := theGrpcServer{serviceHandler: requestHandler} actualProtoResponse, err := grpcServer.TheFunction(context.TODO(), expectedProtoRequest) if err != nil { @@ -51,7 +53,9 @@ func TestTheGrpcServer(t *testing.T) { theErrorToReturn: expectedError, } - grpcServer := theGrpcServer{serviceHandler: &service.RequestHandler{Config: &service.Config{}, Strategy: strategy}} + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + grpcServer := theGrpcServer{serviceHandler: requestHandler} _, actualError := grpcServer.TheFunction(context.TODO(), expectedProtoRequest) if actualError == nil { diff --git a/protocols/http.go b/protocols/http.go index b63585b..a4bfdcd 100644 --- a/protocols/http.go +++ b/protocols/http.go @@ -1,6 +1,7 @@ package protocols import ( + "context" "errors" "fmt" "io" @@ -19,7 +20,8 @@ import ( var marshaller = &jsonpb.Marshaler{} type theHTTPServer struct { - port int + httpServer *http.Server + port int } type httpHandler struct { @@ -30,6 +32,11 @@ func (s *theHTTPServer) GetID() string { return fmt.Sprintf("h1-%d", s.port) } +func (s *theHTTPServer) Shutdown() error { + log.Infof("Shutting down [%s]", s.GetID()) + return s.httpServer.Shutdown(context.Background()) +} + func (h *httpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { var protoReq pb.TheRequest @@ -41,7 +48,7 @@ func (h *httpHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } protoReq = r } else { - newRequestUID := newRequestUID("http", h.serviceHandler.Config) + newRequestUID := newRequestUID("http", h.serviceHandler.ConfigID()) log.Infof("Received request with empty body, assigning new request UID [%s] to it", newRequestUID) protoReq = pb.TheRequest{ RequestUID: newRequestUID, @@ -90,8 +97,8 @@ func (c *httpClient) Send(req *pb.TheRequest) (*pb.TheResponse, error) { return &protoResp, err } -func newRequestUID(inboundType string, config *service.Config) string { - return fmt.Sprintf("in:%s-sid:%s-%d", inboundType, config.ID, time.Now().Nanosecond()) +func newRequestUID(inboundType string, configID string) string { + return fmt.Sprintf("in:%s-sid:%s-%d", inboundType, configID, time.Now().Nanosecond()) } func marshallProtobufToJSON(msg proto.Message) (string, error) { @@ -153,14 +160,18 @@ func NewHTTPServerIfConfigured(config *service.Config, serviceHandler *service.R return nil, nil } - handler := newHTTPHandler(serviceHandler) + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", config.H1ServerPort), + Handler: newHTTPHandler(serviceHandler), + } go func() { log.Infof("HTTP 1.1 server listening on port [%d]", config.H1ServerPort) - http.ListenAndServe(fmt.Sprintf(":%d", config.H1ServerPort), handler) + srv.ListenAndServe() }() return &theHTTPServer{ - port: config.H1ServerPort, + port: config.H1ServerPort, + httpServer: srv, }, nil } diff --git a/protocols/http_test.go b/protocols/http_test.go index 74af7a2..8873a8f 100644 --- a/protocols/http_test.go +++ b/protocols/http_test.go @@ -22,7 +22,9 @@ func TestTheHTTPServer(t *testing.T) { theResponseToReturn: expectedProtoResponse, } - handler := newHTTPHandler(&service.RequestHandler{Strategy: strategy, Config: &service.Config{}}) + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + handler := newHTTPHandler(requestHandler) theServer := httptest.NewServer(handler) defer theServer.Close() @@ -68,7 +70,9 @@ func TestTheHTTPServer(t *testing.T) { theResponseToReturn: expectedProtoResponse, } - handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy}) + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + handler := newHTTPHandler(requestHandler) theServer := httptest.NewServer(handler) defer theServer.Close() @@ -109,7 +113,9 @@ func TestTheHTTPServer(t *testing.T) { t.Run("returns a 500 if payload is not the expected protobuf as json", func(t *testing.T) { strategy := &stubStrategy{} - handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy}) + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + handler := newHTTPHandler(requestHandler) theServer := httptest.NewServer(handler) defer theServer.Close() @@ -146,7 +152,9 @@ func TestTheHTTPServer(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy}) + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + handler := newHTTPHandler(requestHandler) theServer := httptest.NewServer(handler) defer theServer.Close() @@ -194,7 +202,9 @@ func TestHTTPClient(t *testing.T) { theResponseToReturn: expectedProtoResponse, } - handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy}) + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + handler := newHTTPHandler(requestHandler) theServer := httptest.NewServer(handler) defer theServer.Close() @@ -228,7 +238,9 @@ func TestHTTPClient(t *testing.T) { theErrorToReturn: errors.New("this error was injected by [terminus-grpc:-1-h1:9090]"), } - handler := newHTTPHandler(&service.RequestHandler{Config: &service.Config{}, Strategy: strategy}) + requestHandler := service.NewRequestHandler(&service.Config{}) + requestHandler.Strategy = strategy + handler := newHTTPHandler(requestHandler) theServer := httptest.NewServer(handler) defer theServer.Close() diff --git a/service/service.go b/service/service.go index 0a3ead9..f1283ce 100644 --- a/service/service.go +++ b/service/service.go @@ -19,6 +19,7 @@ type Config struct { H1DownstreamServers []string PercentageFailedRequests int SleepInMillis int + TerminateAfter int FireAndForget bool DownstreamConnectionTimeout time.Duration ExtraArguments map[string]string @@ -60,6 +61,7 @@ func MakeFireAndForget(client Client) Client { // Server is an abstraction representing each server made available to receive inbound connections. type Server interface { GetID() string + Shutdown() error } // Strategy is the algorithm applied by this service when it receives requests (c.f. http://wiki.c2.com/?StrategyPattern) @@ -67,10 +69,52 @@ type Strategy interface { Do(context.Context, *pb.TheRequest) (*pb.TheResponse, error) } +// +// TODO: move RequestHandler into its own file +// + // RequestHandler is a protocol-independent request/response handler interface type RequestHandler struct { - Config *Config - Strategy Strategy + Strategy Strategy // public due to circular dependency between server and strategy + + config *Config + stopCh chan struct{} + requestCount int + counterCh chan struct{} +} + +// requestCounter approximates an atomic read/write counter via channels +func (h *RequestHandler) requestCounter() { + for range h.counterCh { + h.requestCount++ + if h.requestCount == h.config.TerminateAfter { + log.Infof("TerminateAfter limit hit (%d), stopping [%s]", h.config.TerminateAfter, h.config.ID) + h.stopCh <- struct{}{} + } + } +} + +func NewRequestHandler(config *Config) *RequestHandler { + h := &RequestHandler{ + config: config, + stopCh: make(chan struct{}), + requestCount: 0, + counterCh: make(chan struct{}), + } + + if h.config.TerminateAfter != 0 { + go h.requestCounter() + } + + return h +} + +func (h *RequestHandler) ConfigID() string { + return h.config.ID +} + +func (h *RequestHandler) Stopping() <-chan struct{} { + return h.stopCh } // Handle takes in a request, processes it accordingly to its Strategy, an returns the response. @@ -78,7 +122,11 @@ func (h *RequestHandler) Handle(ctx context.Context, req *pb.TheRequest) (*pb.Th sleepForConfiguredTime(h) if shouldFailThisRequest(h) { - return nil, fmt.Errorf("this error was injected by [%s]", h.Config.ID) + return nil, fmt.Errorf("this error was injected by [%s]", h.config.ID) + } + + if h.config.TerminateAfter != 0 { + h.counterCh <- struct{}{} } reqID := req.RequestUID @@ -91,11 +139,11 @@ func (h *RequestHandler) Handle(ctx context.Context, req *pb.TheRequest) (*pb.Th } func sleepForConfiguredTime(h *RequestHandler) { - time.Sleep(time.Duration(int64(h.Config.SleepInMillis)) * time.Millisecond) + time.Sleep(time.Duration(int64(h.config.SleepInMillis)) * time.Millisecond) } func shouldFailThisRequest(h *RequestHandler) bool { - perc := h.Config.PercentageFailedRequests + perc := h.config.PercentageFailedRequests rnd := rand.Intn(100) return rnd < perc } diff --git a/service/service_test.go b/service/service_test.go index fcd96f4..51a6de7 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -18,7 +18,7 @@ func TestRequestHandler(t *testing.T) { } handler := RequestHandler{ - Config: &Config{}, + config: &Config{}, Strategy: strategy, } @@ -44,7 +44,7 @@ func TestRequestHandler(t *testing.T) { } handler := RequestHandler{ - Config: &Config{}, + config: &Config{}, Strategy: strategy, } @@ -62,21 +62,21 @@ func TestRequestHandler(t *testing.T) { } neverFailHandler := RequestHandler{ - Config: &Config{ + config: &Config{ PercentageFailedRequests: 0, }, Strategy: strategy, } alwaysFailHandler := RequestHandler{ - Config: &Config{ + config: &Config{ PercentageFailedRequests: 100, }, Strategy: strategy, } sometimesFailHandler := RequestHandler{ - Config: &Config{ + config: &Config{ PercentageFailedRequests: 70, }, Strategy: strategy, @@ -104,6 +104,36 @@ func TestRequestHandler(t *testing.T) { t.Fatalf("Expected sometimes error to fail at least once, but it didnt fail") } }) + + t.Run("will exit after a specified number of requests", func(t *testing.T) { + expectedRequest := &pb.TheRequest{RequestUID: "expected req"} + expectedResponse := &pb.TheResponse{Payload: "expected resp"} + strategy := &MockStrategy{ + ResponseToReturn: expectedResponse, + } + + terminateLimit := 2 + terminateHandler := NewRequestHandler(&Config{TerminateAfter: terminateLimit}) + terminateHandler.Strategy = strategy + + for i := 0; i < terminateLimit; i++ { + _, err := terminateHandler.Handle(context.TODO(), expectedRequest) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if i+1 < terminateLimit { + select { + case <-terminateHandler.Stopping(): + t.Fatalf("RequestHandler terminated when it should not have: %d != %d", i+1, terminateLimit) + default: + } + } else { + // this will timeout the test if it fails + <-terminateHandler.Stopping() + } + } + }) } func TestFireAndForgetClient(t *testing.T) { diff --git a/service/test_helper.go b/service/test_helper.go index ab1a882..d5ff780 100644 --- a/service/test_helper.go +++ b/service/test_helper.go @@ -36,6 +36,8 @@ type MockServer struct { func (m MockServer) GetID() string { return m.IDToReturn } +func (m MockServer) Shutdown() error { return nil } + type MockStrategy struct { ContextReceived context.Context RequestReceived *pb.TheRequest diff --git a/strategies/broadcast_channel.go b/strategies/broadcast_channel.go index cdaad3d..f70a941 100644 --- a/strategies/broadcast_channel.go +++ b/strategies/broadcast_channel.go @@ -15,7 +15,7 @@ import ( // BroadcastChannelStrategyName is the user-friendly name of this strategy const BroadcastChannelStrategyName = "broadcast-channel" -// BroadcastChannelStrategy is a strategy that will take in a request and broadact it to all downstream services. +// BroadcastChannelStrategy is a strategy that will take in a request and broadcast it to all downstream services. type BroadcastChannelStrategy struct { clients []service.Client }