From d16005dd0a0cb91468f5181f1a864985819ef80b Mon Sep 17 00:00:00 2001 From: Jose Lopes Date: Fri, 21 Oct 2022 15:04:31 +0100 Subject: [PATCH] Add RabbitMQ support (#44) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat: add RabbitMQ support Co-authored-by: Seán C McCord --- README.md | 53 ++- ari-proxy.yaml | 2 +- client/bus/bus.go | 46 +- client/client.go | 268 +++++------- client/clientserver_test.go | 4 +- client/doc.go | 2 +- client/listen.go | 19 +- cmd.go | 18 +- go.mod | 17 +- go.sum | 6 + messagebus/messagebus.go | 106 +++++ messagebus/nats.go | 278 +++++++++++++ messagebus/rabbitmq.go | 692 +++++++++++++++++++++++++++++++ messagebus/rabbitmqsub.go | 106 +++++ messagebus/response_forwarder.go | 39 ++ proxy/types.go | 4 +- server/clientserver_test.go | 4 +- server/handler.go | 2 +- server/liveRecording.go | 4 +- server/options.go | 2 +- server/server.go | 178 ++++---- 21 files changed, 1509 insertions(+), 341 deletions(-) create mode 100644 messagebus/messagebus.go create mode 100644 messagebus/nats.go create mode 100644 messagebus/rabbitmq.go create mode 100644 messagebus/rabbitmqsub.go create mode 100644 messagebus/response_forwarder.go diff --git a/README.md b/README.md index 5e4e724..1cbc0e2 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,9 @@ Proxy for the Asterisk REST interface (ARI). The ARI proxy facilitates scaling of both applications and Asterisk, independently and with minimal coordination. Each Asterisk instance and ARI application pair runs an `ari-proxy` server instance, which talks to a common -NATS cluster. Each client application talks to the same NATS cluster. The +NATS or RabbitMQ cluster. Each client application talks to the same message bus. The clients automatically and continuously discover new Asterisk instances, so the -only coordination needed is the common location of the NATS cluster. +only coordination needed is the common location of the message bus. The ARI proxy allows for: - Any number of applications running the ARI client @@ -22,13 +22,17 @@ The ARI proxy allows for: - Simple call event reception by any number of application clients. (No single-app lockout) +Supported message buses: + - [NATS](https://nats.io) + - [RabbitMQ](https://rabbitmq.com) ## Proxy server Docker images are kept up to date with releases and are tagged accordingly. The `ari-proxy` does not expose any services, so no ports need to be opened for it. -However, it does need to know how to connect to both Asterisk and NATS. +However, it does need to know how to connect to both Asterisk and the message +bus. ``` docker run \ @@ -37,7 +41,7 @@ However, it does need to know how to connect to both Asterisk and NATS. -e ARI_PASSWORD="supersecret" \ -e ARI_HTTP_URL="http://asterisk:8088/ari" \ -e ARI_WEBSOCKET_URL="ws://asterisk:8088/ari/events" \ - -e NATS_URL="nats://nats:4222" \ + -e MESSAGEBUS_URL="nats://nats:4222" \ cycoresystems/ari-proxy ``` @@ -74,9 +78,25 @@ func connect(ctx context.Context, appName string) (ari.Client,error) { } ``` +Connecting the client to RabbitMQ is like: + +```go +import ( + "github.com/CyCoreSystems/ari/v5" + "github.com/CyCoreSystems/ari-proxy/v5/client" +) + +func connect(ctx context.Context, appName string) (ari.Client,error) { + c, err := client.New(ctx, + client.WithApplication(appName), + client.WithURI("amqp://user:password@rabbitmqhost:5679/"), + ) +} +``` + Configuration of the client can also be done with environment variables. -`ARI_APPLICATION` can be used to set the ARI application name, and `NATS_URI` -can be used to set the NATS URI. Doing so allows you to get a client connection +`ARI_APPLICATION` can be used to set the ARI application name, and `MESSAGEBUS_URL` +can be used to set the message bus URL. Doing so allows you to get a client connection simply with `client.New(ctx)`. Once an `ari.Client` is obtained, the client functions exactly as the native @@ -97,14 +117,14 @@ open subscriptions on the client. Layers of clients can be used efficiently with different contexts using the `New(context.Context)` function of each client instance. Subtended clients will -be closed with their parents, use a common internal NATS connection, and can be +be closed with their parents, use a common internal message bus connection, and can be severally closed by their individual contexts. This makes managing many active channels easy and efficient. ### Lifecycle There are two levels of client in use. The first is a connection, which is a -long-lived network connection to the NATS cluster. In general, the end user +long-lived network connection to the message bus. In general, the end user should not close this connection. However, it is available, if necessary, as `DefaultConn` and offers a `Close()` function for itself. @@ -127,7 +147,7 @@ of where the client is located. These pieces of information are handled transparently and internally by the ARI proxy and the ARI proxy client to route commands and events where they should be sent. -### NATS protocol details +### Message bus protocol details The protocol details described below are only necessary to know if you do not use the provided client and/or server. By using both components in this repository, the @@ -135,7 +155,7 @@ protocol details below are transparently handled for you. #### Subject structure -The NATS subject prefix defaults to `ari.`, and all messages used by this proxy +The message bus subject prefix defaults to `ari.`, and all messages used by this proxy will be prefixed by that term. Next is added one of four resource classifications: @@ -165,12 +185,13 @@ ARI application command subject. In fact, each ARI proxy listens to each of the three levels. A request to `ari.command` will result in all ARI proxies responding.) -This setup allows for a variable generalization in the listeners by using NATS +This setup allows for a variable generalization in the listeners by using +message bus wildcard subscriptions. For instance, if you want to receive all events for the -"test" application regardless from which Asterisk machine they come, you would -subscribe to: +"test" application regardless from which Asterisk machine they come, you would subscribe to: -`ari.event.test.>` +`ari.event.test.>` //NATS +`ari.event.test.#` //RabbitMQ #### Dialogs @@ -179,14 +200,14 @@ specifies a Dialog ID in its metadata, the ARI proxy will further classify events related to that dialog. Relationships are defined by the entity type on which the Dialog-infused command operates. -Dialog-related events are published on their own NATS subject tree, +Dialog-related events are published on their own message bus subject tree, `dialogevent`. Thus dialogs abstract ARI application and Asterisk ID. An event for dialog "testme123" would be published to: `ari.dialogevent.testme123` Keep in mind that regardless of dialog associations, all events are _also_ -published to their appropriate canonical NATS subjects. Dialogs are intended as +published to their appropriate canonical message bus subjects. Dialogs are intended as a mechanism to: - reduce client message traffic load diff --git a/ari-proxy.yaml b/ari-proxy.yaml index 59096b4..390bd50 100644 --- a/ari-proxy.yaml +++ b/ari-proxy.yaml @@ -4,5 +4,5 @@ ari: application: example http_url: http://localhost:8088/ari websocket_url: ws://localhost:8088/ari/events -nats: +messagebus: url: nats://nats:4222 diff --git a/client/bus/bus.go b/client/bus/bus.go index 677f5be..f538bb8 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -4,37 +4,40 @@ import ( "fmt" "sync" + "github.com/CyCoreSystems/ari-proxy/v5/messagebus" "github.com/CyCoreSystems/ari/v5" "github.com/inconshreveable/log15" - - "github.com/nats-io/nats.go" ) // EventChanBufferLength is the number of unhandled events which can be queued // to the event channel buffer before further events are lost. var EventChanBufferLength = 10 -// Bus provides an ari.Bus interface to NATS +// Bus provides an ari.Bus interface to MessageBus type Bus struct { prefix string log log15.Logger - nc *nats.EncodedConn + mbus messagebus.Client } // New returns a new Bus -func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus { +func New(prefix string, m messagebus.Client, log log15.Logger) *Bus { return &Bus{ prefix: prefix, log: log, - nc: nc, + mbus: m, } } func (b *Bus) subjectFromKey(key *ari.Key) string { if key == nil { - return fmt.Sprintf("%sevent.>", b.prefix) + return fmt.Sprintf( + "%sevent.%s", + b.prefix, + b.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords), + ) } if key.Dialog != "" { @@ -43,23 +46,23 @@ func (b *Bus) subjectFromKey(key *ari.Key) string { subj := fmt.Sprintf("%sevent.", b.prefix) if key.App == "" { - return subj + ">" + return subj + b.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords) } subj += key.App + "." if key.Node == "" { - return subj + ">" + return subj + b.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords) } return subj + key.Node } -// Subscription represents an ari.Subscription over NATS +// Subscription represents an ari.Subscription over MessageBus type Subscription struct { key *ari.Key log log15.Logger - subscription *nats.Subscription + subscription messagebus.Subscription eventChan chan ari.Event @@ -91,11 +94,18 @@ func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription { events: n, } - s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) { - s.receive(m) - }) + var app string + if key != nil { + app = key.App + } + + s.subscription, err = b.mbus.SubscribeEvent( + b.subjectFromKey(key), + app, + s.receive, + ) if err != nil { - b.log.Error("failed to subscribe to NATS", "error", err) + b.log.Error("failed to subscribe to MessageBus", "error", err) return nil } return s @@ -115,7 +125,7 @@ func (s *Subscription) Cancel() { if s.subscription != nil { err := s.subscription.Unsubscribe() if err != nil { - s.log.Error("failed unsubscribe from NATS", "error", err) + s.log.Error("failed unsubscribe from MessageBus", "error", err) } } @@ -127,8 +137,8 @@ func (s *Subscription) Cancel() { s.mu.Unlock() } -func (s *Subscription) receive(o *nats.Msg) { - e, err := ari.DecodeEvent(o.Data) +func (s *Subscription) receive(data []byte) { + e, err := ari.DecodeEvent(data) if err != nil { s.log.Error("failed to convert received message to ari.Event", "error", err) return diff --git a/client/client.go b/client/client.go index b7e6a5a..0ef9e6d 100644 --- a/client/client.go +++ b/client/client.go @@ -2,15 +2,16 @@ package client import ( "context" + "errors" "os" - "sync" "time" "github.com/CyCoreSystems/ari-proxy/v5/client/bus" "github.com/CyCoreSystems/ari-proxy/v5/client/cluster" + "github.com/CyCoreSystems/ari-proxy/v5/messagebus" "github.com/CyCoreSystems/ari-proxy/v5/proxy" "github.com/CyCoreSystems/ari/v5" - "github.com/CyCoreSystems/ari/v5/rid" + "github.com/rabbitmq/amqp091-go" "github.com/rotisserie/eris" "github.com/inconshreveable/log15" @@ -27,11 +28,11 @@ import ( // immediately. var ClosureGracePeriod = 10 * time.Second -// DefaultRequestTimeout is the default timeout for a NATS request. (Note: Answer() takes longer than 250ms on average) +// DefaultRequestTimeout is the default timeout for a MessageBus request. (Note: Answer() takes longer than 250ms on average) var DefaultRequestTimeout = 500 * time.Millisecond // DefaultInputBufferLength is the default size of the event buffer for events -// coming in from NATS +// coming in from MessageBus const DefaultInputBufferLength = 100 // DefaultClusterMaxAge is the default maximum age for cluster members to be @@ -43,7 +44,7 @@ var ErrNil = eris.New("Nil") // core is the core, functional piece of a Client which is the same across the // family of derived clients. It manages stateful elements such as the bus, -// the NATS connection, and the cluster membership +// the MessageBus connection, and the cluster membership type core struct { // cluster describes the cluster of ARI proxies cluster *cluster.Cluster @@ -51,16 +52,16 @@ type core struct { // clusterMaxAge is the maximum age of cluster members to include in queries clusterMaxAge time.Duration - // inputBufferLength is the size of the buffer for events coming in from NATS + // inputBufferLength is the size of the buffer for events coming in from MessageBus inputBufferLength int log log15.Logger - // nc provides the nats.EncodedConn over which messages will be transceived. - // One of NATS or NATSURI must be specified. - nc *nats.EncodedConn + // Message Bus over which messages will be transceived + // One of mbus or uri must be specified. + mbus messagebus.Client - // prefix is the prefix to use on all NATS subjects. It defaults to "ari.". + // prefix is the prefix to use on all MessageBus subjects. It defaults to "ari.". prefix string // refCounter is the reference counter for derived clients. When there are @@ -70,19 +71,16 @@ type core struct { // requestTimeout is the timeout duration of a request requestTimeout time.Duration - // timeoutRetries is the amount of times to retry on nats timeout + // timeoutRetries is the amount of times to retry on message bus timeout timeoutRetries int - // countTimeouts tracks how many timeouts the client has received, for metrics. - countTimeouts int64 // nolint: structcheck - - // uri provies the URI to which a NATS connection should be established. One - // of NATS or NATSURI must be specified. This option may also be supplied by - // the `NATS_URI` environment variable. + // uri provies the URI to which a Message Bus connection should be established. One + // of mbus or uri must be specified. This option may also be supplied by + // the `MESSAGEBUS_URL` environment variable. uri string - // annSub is the NATS subscription to proxy announcements - annSub *nats.Subscription + // annSub is the subscription to proxy announcements + annSub messagebus.Subscription // closeChan is the signal channel responsible for shutting down core // services. When it is closed, all core services should exit. @@ -91,9 +89,9 @@ type core struct { // closed indicates the core has been closed closed bool - // closeNATSOnClose indicates that the NATS connection should be closed when + // closeMBusOnClose indicates that the Message Bus connection should be closed when // the ari.Client is closed - closeNATSOnClose bool + closeMBusOnClose bool // started indicates whether this core has been started; a started core will // no-op core.start() @@ -120,13 +118,11 @@ func (c *core) close() { if c.annSub != nil { err := c.annSub.Unsubscribe() if err != nil { - c.log.Debug("failed to unsubscribe from NATS proxy announcements", "error", err) + c.log.Debug("failed to unsubscribe from proxy announcements", "error", err) } } - if c.closeNATSOnClose && c.nc != nil { - c.nc.Close() - } + c.mbus.Close() } func (c *core) Start() error { @@ -141,22 +137,37 @@ func (c *core) Start() error { c.closeChan = make(chan struct{}) - // Connect to NATS, if we do not already have a connection - if c.nc == nil { - n, err := nats.Connect(c.uri) - if err != nil { - c.close() - return eris.Wrap(err, "failed to connect to NATS") + // Connect to MessageBus, if we do not already have a connection + if c.mbus == nil { + switch messagebus.GetType(c.uri) { + case messagebus.TypeNats: + c.mbus = &messagebus.NatsBus{ + Config: messagebus.Config{ + URL: c.uri, + TimeoutRetries: c.timeoutRetries, + RequestTimeout: c.requestTimeout, + }, + Log: c.log, + } + case messagebus.TypeRabbitmq: + c.mbus = &messagebus.RabbitmqBus{ + Config: messagebus.Config{ + URL: "amqp://guest:guest@rabbitmq:5672/", + TimeoutRetries: c.timeoutRetries, + RequestTimeout: c.requestTimeout, + }, + Log: c.log, + } + default: + return errors.New("Unknown url for MessageBus: " + c.uri) } - c.nc, err = nats.NewEncodedConn(n, nats.JSON_ENCODER) + err := c.mbus.Connect() if err != nil { - n.Close() // need this here because nc is not yet bound to the core c.close() - return eris.Wrap(err, "failed to encode NATS connection") + return eris.Wrap(err, "failed to connect to MessageBus") } - - c.closeNATSOnClose = true + c.closeMBusOnClose = true } // Create and start the cluster @@ -173,7 +184,8 @@ func (c *core) Start() error { } func (c *core) maintainCluster() (err error) { - c.annSub, err = c.nc.Subscribe(proxy.AnnouncementSubject(c.prefix), func(o *proxy.Announcement) { + + c.annSub, err = c.mbus.SubscribeAnnounce(proxy.AnnouncementSubject(c.prefix), func(o *proxy.Announcement) { c.cluster.Update(o.Node, o.Application) }) if err != nil { @@ -181,7 +193,11 @@ func (c *core) maintainCluster() (err error) { } // Send an initial ping for proxy announcements - return c.nc.Publish(proxy.PingSubject(c.prefix), &proxy.Request{}) + err = c.mbus.PublishPing(proxy.PingSubject(c.prefix)) + if err != nil { + return eris.Wrap(err, "failed to publish ping") + } + return err } // Client provides an ari.Client for an ari-proxy server @@ -198,7 +214,7 @@ type Client struct { closed bool } -// New creates a new Client to the Asterisk ARI NATS proxy. +// New creates a new Client to the Asterisk ARI NATS/RabbitMQ proxy. func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { ctx, cancel := context.WithCancel(ctx) @@ -218,7 +234,9 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { c.log.SetHandler(log15.DiscardHandler()) // Load environment-based configurations - if os.Getenv("NATS_URI") != "" { + if os.Getenv("MESSAGEBUS_URL") != "" { + c.core.uri = os.Getenv("MESSAGEBUS_URL") + } else if os.Getenv("NATS_URI") != "" { //backward compatibility c.core.uri = os.Getenv("NATS_URI") } @@ -234,7 +252,7 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { } // Create the bus - c.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + c.bus = bus.New(c.core.prefix, c.core.mbus, c.core.log) // Call Close whenever the context is closed go func() { @@ -254,7 +272,7 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { // New returns a new client from the existing one. The new client will have a // separate event bus and lifecycle, allowing the closure of all subscriptions // and handles derived from the client by simply closing the client. The -// underlying NATS connection and cluster awareness (the common Core) will be +// underlying MessageBus connection and cluster awareness (the common Core) will be // preserved across derived Client lifecycles. func (c *Client) New(ctx context.Context) *Client { _, cancel := context.WithCancel(ctx) @@ -263,7 +281,7 @@ func (c *Client) New(ctx context.Context) *Client { appName: c.appName, cancel: cancel, core: c.core, - bus: bus.New(c.core.prefix, c.core.nc, c.core.log), + bus: bus.New(c.core.prefix, c.core.mbus, c.core.log), } } @@ -271,14 +289,14 @@ func (c *Client) New(ctx context.Context) *Client { type OptionFunc func(*Client) // FromClient configures the ARI Application to use the transport details from -// another ARI Client. Transport-related details are copied, such as the NATS -// Client, the NATS prefix, the timeout values. +// another ARI Client. Transport-related details are copied, such as the MessageBus +// Client, the MessageBus prefix, the timeout values. // // Specifically NOT copied are dialog, application, and asterisk details. // -// NOTE: use of this function will cause NATS connection leakage if there is a +// NOTE: use of this function will cause MessageBus connection leakage if there is a // mix of uses of FromClient and not over a period of time. If you intend to -// use FromClient, it is recommended that you always pass a NATS client in to +// use FromClient, it is recommended that you always pass a MessageBus client in to // the first ari.Client and maintain lifecycle control of it manually. func FromClient(cl ari.Client) OptionFunc { return func(c *Client) { @@ -310,8 +328,8 @@ func WithLogHandler(h log15.Handler) OptionFunc { } } -// WithURI sets the NATS URI to which the client will attempt to connect. -// The NATS URI may also be configured by the environment variable `NATS_URI`. +// WithURI sets the MessageBus URI to which the client will attempt to connect. +// The MessageBus URI may also be configured by the environment variable `MESSAGEBUS_URL`. func WithURI(uri string) OptionFunc { return func(c *Client) { c.core.uri = uri @@ -321,11 +339,24 @@ func WithURI(uri string) OptionFunc { // WithNATS binds an existing NATS connection func WithNATS(nc *nats.EncodedConn) OptionFunc { return func(c *Client) { - c.nc = nc + c.mbus = messagebus.NewNatsBus( + messagebus.Config{}, + messagebus.WithNatsConn(nc), + ) + } +} + +// WithRabbitmq binds an existing RabbitMQ connection +func WithRabbitmq(conn *amqp091.Connection) OptionFunc { + return func(c *Client) { + c.mbus = messagebus.NewRabbitmqBus( + messagebus.Config{}, + messagebus.WithRabbitmqConn(conn), + ) } } -// WithPrefix configures the NATS Prefix to use on a Client +// WithPrefix configures the MessageBus Prefix to use on a Client func WithPrefix(prefix string) OptionFunc { return func(c *Client) { c.core.prefix = prefix @@ -508,23 +539,12 @@ func (c *Client) listRequest(req *proxy.Request) ([]*ari.Key, error) { } func (c *Client) makeRequest(class string, req *proxy.Request) (*proxy.Response, error) { - var resp proxy.Response - var err error - if !c.completeCoordinates(req) { return c.makeBroadcastRequestReturnFirstGoodResponse(class, req) } - for i := 0; i <= c.core.timeoutRetries; i++ { - err = c.nc.Request(c.subject(class, req), req, &resp, c.requestTimeout) - if err == nats.ErrTimeout { - c.countTimeouts++ - continue - } - return &resp, err - } - - return nil, err + c.log.Error("request", "class", class, "req", req, "subject", c.subject(class, req)) + return c.mbus.Request(c.subject(class, req), req) } func (c *Client) makeRequests(class string, req *proxy.Request) (responses []*proxy.Response, err error) { @@ -535,76 +555,11 @@ func (c *Client) makeRequests(class string, req *proxy.Request) (responses []*pr req.Key = ari.NewKey("", "") } - var responseCount int expected := len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge)) - reply := rid.New("rp") - replyChan := make(chan *proxy.Response) - replySub, err := c.core.nc.Subscribe(reply, func(o *proxy.Response) { - responseCount++ - - replyChan <- o - - if responseCount >= expected { - close(replyChan) - } - }) - if err != nil { - return nil, eris.Wrap(err, "failed to subscribe to data responses") - } - defer replySub.Unsubscribe() // nolint: errcheck - - // Make an all-call for the entity data - err = c.core.nc.PublishRequest(c.subject(class, req), reply, req) - if err != nil { - return nil, eris.Wrap(err, "failed to make request for data") - } - - // Wait for replies - for { - select { - case <-time.After(c.requestTimeout): - return responses, nil - case resp, ok := <-replyChan: - if !ok { - return responses, nil - } - responses = append(responses, resp) - } - } -} -type limitedResponseForwarder struct { - closed bool - count int - expected int - fwdChan chan *proxy.Response - - mu sync.Mutex + return c.mbus.MultipleRequest(c.subject(class, req), req, expected) } -func (f *limitedResponseForwarder) Forward(o *proxy.Response) { - f.mu.Lock() - defer f.mu.Unlock() - - f.count++ - - if f.closed { - return - } - - // always send up reply, so we can track errors. - select { - case f.fwdChan <- o: - default: - } - - if f.count >= f.expected { - f.closed = true - close(f.fwdChan) - } -} - -// TODO: simplify func (c *Client) makeBroadcastRequestReturnFirstGoodResponse(class string, req *proxy.Request) (*proxy.Response, error) { if req == nil { return nil, eris.New("empty request") @@ -614,49 +569,11 @@ func (c *Client) makeBroadcastRequestReturnFirstGoodResponse(class string, req * req.Key = ari.NewKey("", "") } - reply := rid.New("rp") - - rf := &limitedResponseForwarder{ - expected: len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge)), - fwdChan: make(chan *proxy.Response), - } - - replySub, err := c.core.nc.Subscribe(reply, rf.Forward) - if err != nil { - return nil, eris.Wrap(err, "failed to subscribe to data responses") - } - defer replySub.Unsubscribe() // nolint: errcheck - - // Make an all-call for the entity data - if err = c.core.nc.PublishRequest(c.subject(class, req), reply, req); err != nil { - return nil, eris.Wrap(err, "failed to make request for data") - } - - // Wait for replies - for { - select { - case <-time.After(c.requestTimeout): - // Return the last error if we got one; otherwise, return a timeout error - if err == nil { - err = eris.New("timeout") - } - - return nil, err - case resp, more := <-rf.fwdChan: - if !more { - if err == nil { - err = eris.New("no data") - } - - return nil, err - } - if resp != nil { - if err = resp.Err(); err == nil { // store the error for later return - return resp, nil // No error means to return the current value - } - } - } - } + return c.mbus.MultipleRequestReturnFirstGoodResponse( + c.subject(class, req), + req, + len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge)), + ) } func (c *Client) completeCoordinates(req *proxy.Request) bool { @@ -675,7 +592,8 @@ func (c *Client) subject(class string, req *proxy.Request) string { return proxy.Subject(c.core.prefix, class, req.Key.App, req.Key.Node) } -// TimeoutCount is the amount of times the NATS communication times out +// TimeoutCount is the amount of times the communication times out func (c *Client) TimeoutCount() int64 { - return c.countTimeouts + // countTimeouts tracks how many timeouts the client has received, for metrics. + return c.mbus.TimeoutCount() } diff --git a/client/clientserver_test.go b/client/clientserver_test.go index dcc8f2e..62cacad 100644 --- a/client/clientserver_test.go +++ b/client/clientserver_test.go @@ -20,7 +20,7 @@ func (s *srv) Start(ctx context.Context, t *testing.T, mockClient ari.Client, nc s.s = server.New() // tests may run in parallel so we don't want two separate proxy servers to conflict. - s.s.NATSPrefix = rid.New("") + "." + s.s.MBPrefix = rid.New("") + "." go func() { if err := s.s.ListenOn(ctx, mockClient, nc); err != nil { @@ -37,7 +37,7 @@ func (s *srv) Start(ctx context.Context, t *testing.T, mockClient ari.Client, nc return nil, errors.New("Timeout waiting for server ready") } - cl, err := New(ctx, WithTimeoutRetries(4), WithPrefix(s.s.NATSPrefix), WithApplication("asdf")) + cl, err := New(ctx, WithTimeoutRetries(4), WithPrefix(s.s.MBPrefix), WithApplication("asdf")) if err != nil { return nil, err } diff --git a/client/doc.go b/client/doc.go index 9650286..fc72c27 100644 --- a/client/doc.go +++ b/client/doc.go @@ -1,2 +1,2 @@ -// Package client provides an ari.Client implementation for a NATS-based ARI proxy cluster +// Package client provides an ari.Client implementation for a NATS/RabbitMQ-based ARI proxy cluster package client diff --git a/client/listen.go b/client/listen.go index 8d53870..847fa01 100644 --- a/client/listen.go +++ b/client/listen.go @@ -4,8 +4,8 @@ import ( "context" "fmt" + "github.com/CyCoreSystems/ari-proxy/v5/messagebus" "github.com/CyCoreSystems/ari/v5" - "github.com/nats-io/nats.go" "github.com/rotisserie/eris" ) @@ -16,7 +16,7 @@ var ListenQueue = "ARIProxyStasisStartDistributorQueue" // matching events will be sent down the returned StasisStart channel. The // context which is passed to Listen can be used to stop the Listen execution. // -// Importantly, the StasisStart events are listened in a NATS Queue, which +// Importantly, the StasisStart events are listened in a NATS/RabbitMQ Queue, which // means that this may be used to deliver new calls to only a single handler // out of a set of 1 or more handlers in a cluster. func Listen(ctx context.Context, ac ari.Client, h func(*ari.ChannelHandle, *ari.StasisStart)) error { @@ -25,10 +25,15 @@ func Listen(ctx context.Context, ac ari.Client, h func(*ari.ChannelHandle, *ari. return eris.New("ARI Client must be a proxy client") } - subj := fmt.Sprintf("%sevent.%s.>", c.core.prefix, c.ApplicationName()) + subj := fmt.Sprintf( + "%sevent.%s.%s", + c.core.prefix, + c.ApplicationName(), + c.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords), + ) c.log.Debug("listening for events", "subject", subj) - sub, err := c.nc.QueueSubscribe(subj, ListenQueue, listenProcessor(ac, h)) + sub, err := c.mbus.SubscribeEvent(subj, ListenQueue, listenProcessor(ac, h)) if err != nil { return eris.Wrap(err, "failed to subscribe to events") } @@ -39,9 +44,9 @@ func Listen(ctx context.Context, ac ari.Client, h func(*ari.ChannelHandle, *ari. return nil } -func listenProcessor(ac ari.Client, h func(*ari.ChannelHandle, *ari.StasisStart)) func(*nats.Msg) { - return func(m *nats.Msg) { - e, err := ari.DecodeEvent(m.Data) +func listenProcessor(ac ari.Client, h func(*ari.ChannelHandle, *ari.StasisStart)) func([]byte) { + return func(data []byte) { + e, err := ari.DecodeEvent(data) if err != nil { Logger.Error("failed to decode event", "error", err) return diff --git a/cmd.go b/cmd.go index 2dbb4ae..b5a4568 100644 --- a/cmd.go +++ b/cmd.go @@ -22,8 +22,8 @@ var Log log15.Logger var RootCmd = &cobra.Command{ Use: "ari-proxy", Short: "Proxy for the Asterisk REST interface.", - Long: `ari-proxy is a proxy for working the Asterisk daemon over NATS. - ARI commands are exposed over NATS for operation.`, + Long: `ari-proxy is a proxy for working the Asterisk daemon over NATS/RabbitMQ. + ARI commands are exposed over NATS/RabbitMQ for operation.`, RunE: func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -62,14 +62,15 @@ func init() { p.StringVar(&cfgFile, "config", "", "config file (default is $HOME/.ari-proxy.yaml)") p.BoolP("verbose", "v", false, "Enable verbose logging") - p.String("nats.url", nats.DefaultURL, "URL for connecting to the NATS cluster") + p.String("nats.url", nats.DefaultURL, "URL for connecting to the NATS cluster") //backward compatibility + p.String("messagebus.url", nats.DefaultURL, "URL for connecting to the Message Bus cluster") p.String("ari.application", "", "ARI Stasis Application") p.String("ari.username", "", "Username for connecting to ARI") p.String("ari.password", "", "Password for connecting to ARI") p.String("ari.http_url", "http://localhost:8088/ari", "HTTP Base URL for connecting to ARI") p.String("ari.websocket_url", "ws://localhost:8088/ari/events", "Websocket URL for connecting to ARI") - for _, n := range []string{"verbose", "nats.url", "ari.application", "ari.username", "ari.password", "ari.http_url", "ari.websocket_url"} { + for _, n := range []string{"verbose", "nats.url", "messagebus.url", "ari.application", "ari.username", "ari.password", "ari.http_url", "ari.websocket_url"} { err := viper.BindPFlag(n, p.Lookup(n)) if err != nil { panic("failed to bind flag " + n) @@ -98,9 +99,12 @@ func readConfig() { } func runServer(ctx context.Context, log log15.Logger) error { - natsURL := viper.GetString("nats.url") + messagebusURL := viper.GetString("messagebus.url") + if messagebusURL == "" { + messagebusURL = viper.GetString("nats.url") //backward compatibility + } if os.Getenv("NATS_SERVICE_HOST") != "" { - natsURL = "nats://" + os.Getenv("NATS_SERVICE_HOST") + ":" + os.Getenv("NATS_SERVICE_PORT_CLIENT") + messagebusURL = "nats://" + os.Getenv("NATS_SERVICE_HOST") + ":" + os.Getenv("NATS_SERVICE_PORT_CLIENT") } srv := server.New() @@ -113,5 +117,5 @@ func runServer(ctx context.Context, log log15.Logger) error { Password: viper.GetString("ari.password"), URL: viper.GetString("ari.http_url"), WebsocketURL: viper.GetString("ari.websocket_url"), - }, natsURL) + }, messagebusURL) } diff --git a/go.mod b/go.mod index 8e9a625..f072bac 100644 --- a/go.mod +++ b/go.mod @@ -4,18 +4,26 @@ go 1.17 require ( github.com/CyCoreSystems/ari/v5 v5.3.0 + github.com/go-stack/stack v1.8.1 // indirect github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac github.com/nats-io/nats.go v1.18.0 + github.com/pelletier/go-toml/v2 v2.0.5 // indirect + github.com/rabbitmq/amqp091-go v1.5.0 github.com/rotisserie/eris v0.5.4 + github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cobra v1.6.0 github.com/spf13/viper v1.13.0 github.com/stretchr/testify v1.8.0 + github.com/subosito/gotenv v1.4.1 // indirect + golang.org/x/crypto v0.1.0 // indirect + golang.org/x/net v0.1.0 // indirect + golang.org/x/sys v0.1.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect - github.com/go-stack/stack v1.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect @@ -28,19 +36,12 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect - github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.4.0 // indirect - github.com/subosito/gotenv v1.4.1 // indirect - golang.org/x/crypto v0.1.0 // indirect - golang.org/x/net v0.1.0 // indirect - golang.org/x/sys v0.1.0 // indirect golang.org/x/text v0.4.0 // indirect - gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4589cc7..786937d 100644 --- a/go.sum +++ b/go.sum @@ -365,6 +365,10 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY= +github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= @@ -431,6 +435,8 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/messagebus/messagebus.go b/messagebus/messagebus.go new file mode 100644 index 0000000..51bcf2e --- /dev/null +++ b/messagebus/messagebus.go @@ -0,0 +1,106 @@ +package messagebus + +import ( + "strings" + "time" + + "github.com/CyCoreSystems/ari-proxy/v5/proxy" + "github.com/CyCoreSystems/ari/v5" +) + +// DefaultReconnectionAttemts is the default number of reconnection attempts +// It implements a hard coded fault tolerance for a starting NATS cluster +const DefaultReconnectionAttemts = 5 + +// DefaultReconnectionWait is the default wating time between each reconnection +// attempt +const DefaultReconnectionWait = 5 * time.Second + +// Type is the type of MessageBus (RabbitMQ / NATS) +type Type int + +// WildcardType used to identify wildcards used on routing keys on message bus +type WildcardType int + +// wildcard types +const ( + WildcardUndefined WildcardType = iota // undefined type + WildcardOneWord // one word like pre.*.post + WildcardZeroOrMoreWords // zero or more words like pre.> +) + +// types +const ( + TypeUnknown Type = iota // unknown type + TypeNats // NATS type + TypeRabbitmq // RabbitMQ type +) + +// Server defines the functions used on ari-proxy server +type Server interface { + Connect() error + Close() + + SubscribePing(topic string, callback PingHandler) (Subscription, error) + SubscribeRequest(topic string, callback RequestHandler) (Subscription, error) + SubscribeRequests(topics []string, callback RequestHandler) (Subscription, error) + SubscribeCreateRequest(topic string, queue string, callback RequestHandler) (Subscription, error) + PublishResponse(topic string, msg *proxy.Response) error + PublishAnnounce(topic string, msg *proxy.Announcement) error + PublishEvent(topic string, msg ari.Event) error +} + +// Client defines the functions used on ari-proxy client +type Client interface { + Connect() error + Close() + + SubscribeAnnounce(topic string, callback AnnounceHandler) (Subscription, error) + SubscribeEvent(topic string, queue string, callback EventHandler) (Subscription, error) + + PublishPing(topic string) error + Request(topic string, req *proxy.Request) (*proxy.Response, error) + MultipleRequest(topic string, req *proxy.Request, expectedResp int) ([]*proxy.Response, error) + MultipleRequestReturnFirstGoodResponse(topic string, req *proxy.Request, expectedResp int) (*proxy.Response, error) + + TimeoutCount() int64 + GetWildcardString(w WildcardType) string +} + +// Config has general configuration for MessageBus +type Config struct { + URL string + TimeoutRetries int + RequestTimeout time.Duration +} + +// Subscription defines subscription interface +type Subscription interface { + Unsubscribe() error +} + +// RequestHandler handles requests messages +type RequestHandler func(subject string, reply string, req *proxy.Request) + +// ResponseHandler handles response messages +type ResponseHandler func(req *proxy.Response) + +// PingHandler handles ping messages +type PingHandler func() + +// AnnounceHandler handles announce messages +type AnnounceHandler func(o *proxy.Announcement) + +// EventHandler handles event messages +type EventHandler func(b []byte) + +// GetType identifies message bus type from an url +func GetType(url string) Type { + if strings.HasPrefix(url, "amqp://") { + return TypeRabbitmq + } + if strings.HasPrefix(url, "nats://") { + return TypeNats + } + return TypeUnknown +} diff --git a/messagebus/nats.go b/messagebus/nats.go new file mode 100644 index 0000000..8c8624d --- /dev/null +++ b/messagebus/nats.go @@ -0,0 +1,278 @@ +package messagebus + +import ( + "time" + + "github.com/CyCoreSystems/ari-proxy/v5/proxy" + "github.com/CyCoreSystems/ari/v5" + "github.com/CyCoreSystems/ari/v5/rid" + "github.com/inconshreveable/log15" + "github.com/nats-io/nats.go" + "github.com/rotisserie/eris" +) + +// NatsBus is MessageBus implementation for RabbitMQ +type NatsBus struct { + Config Config + Log log15.Logger + + conn *nats.EncodedConn + countTimeouts int64 +} + +// OptionNatsFunc options for RabbitMQ +type OptionNatsFunc func(n *NatsBus) + +// NatsMSubscription handle multiple subscriptions with same handler +type NatsMSubscription struct { + Subscriptions []*nats.Subscription +} + +// Unsubscribe removes the multiple subscriptions +func (n *NatsMSubscription) Unsubscribe() error { + for _, sub := range n.Subscriptions { + err := sub.Unsubscribe() + if err != nil { + return eris.Wrap(err, "failed to unsubscribe "+sub.Subject) + } + } + return nil +} + +// NewNatsBus creates a NatsBus +func NewNatsBus(config Config, options ...OptionNatsFunc) *NatsBus { + + mbus := NatsBus{ + Config: config, + } + + for _, optfn := range options { + optfn(&mbus) + } + + return &mbus +} + +// WithNatsConn binds an existing NATS connection +func WithNatsConn(nconn *nats.EncodedConn) OptionNatsFunc { + return func(n *NatsBus) { + n.conn = nconn + } +} + +// Connect creates a NATS connection +func (n *NatsBus) Connect() error { + nc, err := nats.Connect(n.Config.URL) + reconnectionAttempts := DefaultReconnectionAttemts + for err == nats.ErrNoServers && reconnectionAttempts > 0 { + n.Log.Info("retrying to connect to NATS server", "attempts", reconnectionAttempts) + time.Sleep(DefaultReconnectionWait) + nc, err = nats.Connect(n.Config.URL) + reconnectionAttempts-- + } + if err != nil { + return eris.Wrap(err, "failed to connect to NATS") + } + n.conn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + nc.Close() + return eris.Wrap(err, "failed to encode NATS connection") + } + return nil +} + +// SubscribePing subscribe ping messages +func (n *NatsBus) SubscribePing(topic string, callback PingHandler) (Subscription, error) { + return n.conn.Subscribe(topic, func(m *nats.Msg) { + callback() + }) +} + +// SubscribeRequest subscribe request messages +func (n *NatsBus) SubscribeRequest(topic string, callback RequestHandler) (Subscription, error) { + return n.conn.Subscribe(topic, callback) +} + +// SubscribeRequests subscribe request messages using multiple topics +func (n *NatsBus) SubscribeRequests(topics []string, callback RequestHandler) (Subscription, error) { + + subs := NatsMSubscription{} + for _, topic := range topics { + sub, err := n.conn.Subscribe(topic, callback) + if err != nil { + subs.Unsubscribe() // nolint: errcheck + return nil, eris.Wrapf(err, "failed to create %s subscription", topic) + } + subs.Subscriptions = append(subs.Subscriptions, sub) + + } + return &subs, nil +} + +// SubscribeAnnounce subscribe announce messages +func (n *NatsBus) SubscribeAnnounce(topic string, callback AnnounceHandler) (Subscription, error) { + return n.conn.Subscribe(topic, callback) +} + +// SubscribeEvent subscribe event messages +func (n *NatsBus) SubscribeEvent(topic string, queue string, callback EventHandler) (Subscription, error) { + return n.conn.Subscribe(topic, func(m *nats.Msg) { + callback(m.Data) + }) +} + +// SubscribeCreateRequest subscribe create request messages +func (n *NatsBus) SubscribeCreateRequest(topic string, queue string, callback RequestHandler) (Subscription, error) { + return n.conn.QueueSubscribe(topic, queue, callback) +} + +// PublishResponse sends response message +func (n *NatsBus) PublishResponse(topic string, msg *proxy.Response) error { + return n.conn.Publish(topic, msg) +} + +// PublishPing sends ping message +func (n *NatsBus) PublishPing(topic string) error { + return n.conn.Publish(topic, &proxy.Request{}) +} + +// PublishAnnounce sends announce message +func (n *NatsBus) PublishAnnounce(topic string, msg *proxy.Announcement) error { + return n.conn.Publish(topic, msg) +} + +// PublishEvent sends event message +func (n *NatsBus) PublishEvent(topic string, msg ari.Event) error { + return n.conn.Publish(topic, msg) +} + +// Close closes the connection +func (n *NatsBus) Close() { + if n.conn != nil { + n.conn.Close() + } +} + +// GetWildcardString returns wildcard based on type +func (n *NatsBus) GetWildcardString(w WildcardType) string { + switch w { + case WildcardOneWord: + return "*" + case WildcardZeroOrMoreWords: + return ">" + } + return "" +} + +// Request sends a request message +func (n *NatsBus) Request(topic string, req *proxy.Request) (*proxy.Response, error) { + var err error + var resp proxy.Response + for i := 0; i <= n.Config.TimeoutRetries; i++ { + err = n.conn.Request(topic, req, &resp, n.Config.RequestTimeout) + if err == nats.ErrTimeout { + n.countTimeouts++ + continue + } + if err != nil { + return nil, err + } + return &resp, nil + } + return nil, err +} + +// MultipleRequest sends a request message to multiple consumers +func (n *NatsBus) MultipleRequest(topic string, req *proxy.Request, expectedResp int) ([]*proxy.Response, error) { + var responses []*proxy.Response + + reply := rid.New("rp") + + rf := &responseForwarder{ + expected: expectedResp, + fwdChan: make(chan *proxy.Response), + } + + replySub, err := n.conn.Subscribe(reply, rf.Forward) + if err != nil { + return nil, eris.Wrap(err, "failed to subscribe to data responses") + } + defer replySub.Unsubscribe() // nolint: errcheck + + // Make an all-call for the entity data + err = n.conn.PublishRequest(topic, reply, req) + if err != nil { + return nil, eris.Wrap(err, "failed to make request for data") + } + + // Wait for replies + timer := time.NewTimer(n.Config.RequestTimeout) + defer timer.Stop() + for { + select { + case <-timer.C: + return responses, nil + case resp, more := <-rf.fwdChan: + if !more { + return responses, nil + } + responses = append(responses, resp) + } + } +} + +// MultipleRequestReturnFirstGoodResponse sends a request message to multiple consumers and returns the first good response +func (n *NatsBus) MultipleRequestReturnFirstGoodResponse(topic string, req *proxy.Request, expectedResp int) (*proxy.Response, error) { + + reply := rid.New("rp") + + rf := &responseForwarder{ + expected: expectedResp, + fwdChan: make(chan *proxy.Response), + } + + replySub, err := n.conn.Subscribe(reply, rf.Forward) + if err != nil { + return nil, eris.Wrap(err, "failed to subscribe to data responses") + } + defer replySub.Unsubscribe() // nolint: errcheck + + // Make an all-call for the entity data + err = n.conn.PublishRequest(topic, reply, req) + if err != nil { + return nil, eris.Wrap(err, "failed to make request for data") + } + + // Wait for replies + timer := time.NewTimer(n.Config.RequestTimeout) + defer timer.Stop() + for { + select { + case <-timer.C: + // Return the last error if we got one; otherwise, return a timeout error + if err == nil { + err = eris.New("timeout") + } + + return nil, err + case resp, more := <-rf.fwdChan: + if !more { + if err == nil { + err = eris.New("no data") + } + + return nil, err + } + if resp != nil { + if err = resp.Err(); err == nil { // store the error for later return + return resp, nil // No error means to return the current value + } + } + } + } +} + +// TimeoutCount is the amount of times the communication times out +func (n *NatsBus) TimeoutCount() int64 { + return n.countTimeouts +} diff --git a/messagebus/rabbitmq.go b/messagebus/rabbitmq.go new file mode 100644 index 0000000..32aabf3 --- /dev/null +++ b/messagebus/rabbitmq.go @@ -0,0 +1,692 @@ +package messagebus + +import ( + "context" + "encoding/json" + "errors" + "sync" + "time" + + "github.com/CyCoreSystems/ari-proxy/v5/proxy" + "github.com/CyCoreSystems/ari/v5" + "github.com/CyCoreSystems/ari/v5/rid" + "github.com/inconshreveable/log15" + "github.com/rabbitmq/amqp091-go" + "github.com/rotisserie/eris" +) + +const ( + // DefaultQueueExpire value for queue auto expire + DefaultQueueExpire = 30000 + // DefaultMessageExpire value message expire + DefaultMessageExpire = 30000 + + // exchange names + exchangeEvent = "ari.event" + exchangePing = "ari.ping" + exchangeAnnounce = "ari.announce" + exchangeRequest = "ari.request" + + // type of identifiers + ridConsumer = "co" + ridConsumerReq = "rp" + ridQueue = "q" + ridCorrelation = "cr" +) + +// RabbitmqBus is MessageBus implementation for RabbitMQ +type RabbitmqBus struct { + Config Config + Log log15.Logger + + conn *amqp091.Connection + channel *amqp091.Channel + countTimeouts int64 + isClosed bool + mu sync.RWMutex +} + +// OptionRabbitmqFunc options for RabbitMQ +type OptionRabbitmqFunc func(n *RabbitmqBus) + +// NewRabbitmqBus creates a RabbitmqBus +func NewRabbitmqBus(config Config, options ...OptionRabbitmqFunc) *RabbitmqBus { + + mbus := RabbitmqBus{ + Config: config, + } + + for _, optfn := range options { + optfn(&mbus) + } + + return &mbus +} + +// WithRabbitmqConn binds an existing RabbitMQ connection +func WithRabbitmqConn(rconn *amqp091.Connection) OptionRabbitmqFunc { + return func(r *RabbitmqBus) { + r.conn = rconn + } +} + +// Connect creates a NATS connection +func (r *RabbitmqBus) Connect() error { + if r.isClosed { + r.isClosed = false + } + + if err := r.connect(); err != nil { + return err + } + + // connection watcher + go func() { + for { + reason, ok := <-r.conn.NotifyClose(make(chan *amqp091.Error)) + if !ok { + if r.isClosed { + return + } + r.Log.Error("connection unexpected closed", "reason", reason) + + r.mu.Lock() + for { + if connErr := r.connect(); connErr != nil { + r.Log.Error("connection failed, trying to reconnect", "err", connErr) + time.Sleep(DefaultReconnectionWait) + continue + } + break + } + r.mu.Unlock() + } + } + }() + + return nil +} + +// SubscribePing subscribe ping messages +func (r *RabbitmqBus) SubscribePing(topic string, callback PingHandler) (Subscription, error) { + + sub := RmqSubscription{ + Topics: []string{topic}, + Queue: exchangePing + "-" + rid.New(ridQueue), + Exchange: exchangePing, + ExchangeKind: amqp091.ExchangeFanout, + QueueArgs: amqp091.Table{"x-expires": DefaultQueueExpire}, + } + + d, err := sub.execute(r) + if err != nil { + return nil, err + } + go func(msgs <-chan amqp091.Delivery) { + for { + for msg := range msgs { + err := msg.Ack(false) + if err != nil { + r.Log.Error("failed to ack message: %w", err) + continue + } + callback() + } + if r.isClosed { + return + } + msgs = sub.reconnect(r) + } + }(d) + + return &sub, nil +} + +// SubscribeRequest subscribe request messages +func (r *RabbitmqBus) SubscribeRequest(topic string, callback RequestHandler) (Subscription, error) { + + sub := RmqSubscription{ + Topics: []string{topic}, + Queue: topic + "-" + rid.New(ridQueue), + Exchange: exchangeRequest, + ExchangeKind: amqp091.ExchangeTopic, + QueueArgs: amqp091.Table{"x-expires": DefaultQueueExpire}, + } + + d, err := sub.execute(r) + if err != nil { + return nil, err + } + go func(msgs <-chan amqp091.Delivery) { + for { + for msg := range msgs { + err := msg.Ack(false) + if err != nil { + r.Log.Error("failed to ack message: %w", err) + continue + } + + //callback + var data proxy.Request + err = json.Unmarshal(msg.Body, &data) + if err != nil { + r.Log.Error("Error unmarshall data", "topic", topic, "error", err) + continue + } + callback(topic, msg.ReplyTo, &data) + } + if r.isClosed { + return + } + msgs = sub.reconnect(r) + } + }(d) + + return &sub, nil +} + +// SubscribeRequests subscribe request messages using multiple topics +func (r *RabbitmqBus) SubscribeRequests(topics []string, callback RequestHandler) (Subscription, error) { + + sub := RmqSubscription{ + Topics: topics, + Queue: rid.New(ridQueue), + Exchange: exchangeRequest, + ExchangeKind: amqp091.ExchangeTopic, + QueueArgs: amqp091.Table{"x-expires": DefaultQueueExpire}, + } + + d, err := sub.execute(r) + if err != nil { + return nil, err + } + go func(msgs <-chan amqp091.Delivery) { + for { + for msg := range msgs { + err := msg.Ack(false) + if err != nil { + r.Log.Error("failed to ack message: %w", err) + continue + } + + //callback + var data proxy.Request + err = json.Unmarshal(msg.Body, &data) + if err != nil { + r.Log.Error("Error unmarshall data", "topics", topics, "error", err) + continue + } + callback(msg.RoutingKey, msg.ReplyTo, &data) + } + if r.isClosed { + return + } + msgs = sub.reconnect(r) + } + }(d) + + return &sub, nil +} + +// SubscribeAnnounce subscribe announce messages +func (r *RabbitmqBus) SubscribeAnnounce(topic string, callback AnnounceHandler) (Subscription, error) { + + sub := RmqSubscription{ + Topics: []string{topic}, + Queue: topic + "-" + rid.New(ridQueue), + Exchange: exchangeAnnounce, + ExchangeKind: amqp091.ExchangeFanout, + QueueArgs: amqp091.Table{"x-expires": DefaultQueueExpire}, + } + + d, err := sub.execute(r) + if err != nil { + return nil, err + } + go func(msgs <-chan amqp091.Delivery) { + for { + for msg := range msgs { + err := msg.Ack(false) + if err != nil { + r.Log.Error("failed to ack message: %w", err) + continue + } + + //callback + var data proxy.Announcement + err = json.Unmarshal(msg.Body, &data) + if err != nil { + r.Log.Error("Error unmarshall data", "topic", topic, "error", err) + continue + } + callback(&data) + } + if r.isClosed { + return + } + msgs = sub.reconnect(r) + } + }(d) + + return &sub, nil +} + +// SubscribeEvent subscribe event messages +func (r *RabbitmqBus) SubscribeEvent(topic string, queue string, callback EventHandler) (Subscription, error) { + + sub := RmqSubscription{ + Topics: []string{topic}, + Queue: queue, + Exchange: exchangeEvent, + ExchangeKind: amqp091.ExchangeTopic, + QueueArgs: amqp091.Table{"x-message-ttl": DefaultMessageExpire}, + } + + d, err := sub.execute(r) + if err != nil { + return nil, err + } + go func(msgs <-chan amqp091.Delivery) { + for { + for msg := range msgs { + err := msg.Ack(false) + if err != nil { + r.Log.Error("failed to ack message: %w", err) + continue + } + + //callback + callback(msg.Body) + } + if r.isClosed { + return + } + msgs = sub.reconnect(r) + } + }(d) + + return &sub, nil +} + +// SubscribeCreateRequest subscribe create request messages +func (r *RabbitmqBus) SubscribeCreateRequest(topic string, queue string, callback RequestHandler) (Subscription, error) { + sub := RmqSubscription{ + Topics: []string{topic}, + Queue: queue + "-" + topic, + Exchange: exchangeRequest, + ExchangeKind: amqp091.ExchangeTopic, + } + + d, err := sub.execute(r) + if err != nil { + return nil, err + } + go func(msgs <-chan amqp091.Delivery) { + for { + for msg := range msgs { + err := msg.Ack(false) + if err != nil { + r.Log.Error("failed to ack message: %w", err) + continue + } + + //callback + var data proxy.Request + err = json.Unmarshal(msg.Body, &data) + if err != nil { + r.Log.Error("Error unmarshal data", "topic", topic, "error", err) + continue + } + callback(topic, msg.ReplyTo, &data) + } + if r.isClosed { + return + } + msgs = sub.reconnect(r) + } + }(d) + + return &sub, nil +} + +// PublishResponse sends response message +func (r *RabbitmqBus) PublishResponse(topic string, msg *proxy.Response) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + //exchange should be empty + return r.publish(topic, "", data) +} + +// PublishPing sends ping message +func (r *RabbitmqBus) PublishPing(topic string) error { + data, err := json.Marshal(&proxy.Request{}) + if err != nil { + return err + } + return r.publish(topic, topic, data) +} + +// PublishAnnounce sends announce message +func (r *RabbitmqBus) PublishAnnounce(topic string, msg *proxy.Announcement) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + return r.publish(topic, topic, data) + +} + +// PublishEvent sends event message +func (r *RabbitmqBus) PublishEvent(topic string, msg ari.Event) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + return r.publish(topic, exchangeEvent, data) + +} + +// Close closes the connection +func (r *RabbitmqBus) Close() { + r.mu.Lock() + defer r.mu.Unlock() + + r.isClosed = true + r.conn.Close() // nolint: errcheck +} + +// GetWildcardString returns wildcard based on type +func (r *RabbitmqBus) GetWildcardString(w WildcardType) string { + switch w { + case WildcardOneWord: + return "*" + case WildcardZeroOrMoreWords: + return "#" + } + return "" +} + +// Request sends a request message +func (r *RabbitmqBus) Request(topic string, req *proxy.Request) (*proxy.Response, error) { + var resp proxy.Response + + requestData, err := json.Marshal(req) + if err != nil { + return nil, err + } + + r.mu.RLock() + channel, err := r.conn.Channel() + r.mu.RUnlock() + + if err != nil { + return nil, err + } + + consumerID := rid.New(ridConsumerReq) + msgs, err := channel.Consume( + "amq.rabbitmq.reply-to", // queue + consumerID, // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, eris.Wrap(err, "error consuming channel") + } + defer channel.Cancel(consumerID, false) // nolint: errcheck + + ctx, cancel := context.WithTimeout(context.Background(), r.Config.RequestTimeout) + defer cancel() + for i := 0; i <= r.Config.TimeoutRetries; i++ { + err = channel.PublishWithContext( + ctx, + exchangeRequest, // exchange + topic, // routing key + false, // mandatory + false, // immediate + amqp091.Publishing{ + ContentType: "application/json", + CorrelationId: rid.New(ridCorrelation), + Body: requestData, + ReplyTo: "amq.rabbitmq.reply-to", + }) + + if errors.Is(err, context.DeadlineExceeded) { + r.countTimeouts++ + continue + } + } + + if err != nil { + return nil, eris.Wrap(err, "failed to publish message") + } + + msg := <-msgs + if err := json.Unmarshal(msg.Body, &resp); err != nil { + r.Log.Error("Error on Unmarshal response", "topic", topic, "error", err) + return nil, err + } + return &resp, nil + +} + +// MultipleRequest sends a request message to multiple consumers +func (r *RabbitmqBus) MultipleRequest(topic string, req *proxy.Request, expectedResp int) ([]*proxy.Response, error) { + + responses := make([]*proxy.Response, 0, expectedResp) + + requestData, err := json.Marshal(req) + if err != nil { + return nil, err + } + r.mu.RLock() + channel, err := r.conn.Channel() + r.mu.RUnlock() + if err != nil { + return nil, err + } + + consumerID := rid.New(ridConsumerReq) + msgs, err := channel.Consume( + "amq.rabbitmq.reply-to", // queue + consumerID, // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, eris.Wrap(err, "error consuming channel") + } + defer channel.Cancel(consumerID, false) // nolint: errcheck + + ctx, cancel := context.WithTimeout(context.Background(), r.Config.RequestTimeout) + defer cancel() + for i := 0; i <= r.Config.TimeoutRetries; i++ { + err = channel.PublishWithContext( + ctx, + exchangeRequest, // exchange + topic, // routing key + false, // mandatory + false, // immediate + amqp091.Publishing{ + ContentType: "application/json", + CorrelationId: rid.New(ridCorrelation), + Body: requestData, + ReplyTo: "amq.rabbitmq.reply-to", + }) + + if errors.Is(err, context.DeadlineExceeded) { + r.countTimeouts++ + continue + } + } + if err != nil { + return nil, eris.Wrap(err, "failed to publish message") + } + + timer := time.NewTimer(r.Config.RequestTimeout) + defer timer.Stop() + responseCount := 0 + for { + select { + case <-timer.C: + return responses, nil + case msg, more := <-msgs: + if !more { + return responses, nil + } + var resp proxy.Response + if err := json.Unmarshal(msg.Body, &resp); err != nil { + r.Log.Error("Error on Unmarshal response", "topic", topic, "error", err) + return nil, err + } + responses = append(responses, &resp) + responseCount++ + if responseCount >= expectedResp { + return responses, nil + } + } + } +} + +// MultipleRequestReturnFirstGoodResponse sends a request message to multiple consumers and returns the first good response +func (r *RabbitmqBus) MultipleRequestReturnFirstGoodResponse(topic string, req *proxy.Request, expectedResp int) (*proxy.Response, error) { + + requestData, err := json.Marshal(req) + if err != nil { + return nil, err + } + r.mu.RLock() + channel, err := r.conn.Channel() + r.mu.RUnlock() + if err != nil { + return nil, err + } + + consumerID := rid.New(ridConsumerReq) + msgs, err := channel.Consume( + "amq.rabbitmq.reply-to", // queue + consumerID, // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return nil, eris.Wrap(err, "error consumming channel") + } + defer channel.Cancel(consumerID, false) // nolint: errcheck + + ctx, cancel := context.WithTimeout(context.Background(), r.Config.RequestTimeout) + defer cancel() + for i := 0; i <= r.Config.TimeoutRetries; i++ { + err = channel.PublishWithContext( + ctx, + exchangeRequest, // exchange + topic, // routing key + false, // mandatory + false, // immediate + amqp091.Publishing{ + ContentType: "application/json", + CorrelationId: rid.New(ridCorrelation), + Body: requestData, + ReplyTo: "amq.rabbitmq.reply-to", + }) + + if errors.Is(err, context.DeadlineExceeded) { + r.countTimeouts++ + continue + } + } + if err != nil { + return nil, eris.Wrap(err, "failed to publish message") + } + + timer := time.NewTimer(r.Config.RequestTimeout) + defer timer.Stop() + responseCount := 0 + for { + select { + case <-timer.C: + // Return the last error if we got one; otherwise, return a timeout error + if err == nil { + err = eris.New("timeout") + } + + return nil, err + case msg, more := <-msgs: + + if !more { + if err == nil { + err = eris.New("no data") + } + return nil, err + } + + var resp proxy.Response + if err = json.Unmarshal(msg.Body, &resp); err != nil { + r.Log.Error("Error on Unmarshal response", "topic", topic, "error", err) + continue + } + + if err = resp.Err(); err == nil { // store the error for later return + return &resp, nil // No error means to return the current value + } + + responseCount++ + if responseCount > expectedResp { + return nil, eris.New("no data") + } + } + } +} + +// TimeoutCount is the amount of times the communication times out +func (r *RabbitmqBus) TimeoutCount() int64 { + return r.countTimeouts +} + +func (r *RabbitmqBus) connect() error { + var err error + if r.conn, err = amqp091.Dial(r.Config.URL); err != nil { + return eris.Wrap(err, "connect") + } + if r.channel, err = r.conn.Channel(); err != nil { + return eris.Wrap(err, "channel create") + } + return nil +} + +func (r *RabbitmqBus) publish(topic string, exchange string, data []byte) error { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.channel.PublishWithContext( + context.Background(), + exchange, // exchange, for now always using the default exchange + topic, + false, + false, + amqp091.Publishing{ + Headers: amqp091.Table{}, + ContentType: "application/json", + ContentEncoding: "", + Body: data, + DeliveryMode: amqp091.Transient, // 1=non-persistent, 2=persistent + Priority: 0, // 0-9 + }) +} + +func (r *RabbitmqBus) newChannel() (*amqp091.Channel, error) { + r.mu.RLock() + defer r.mu.RUnlock() + return r.conn.Channel() +} diff --git a/messagebus/rabbitmqsub.go b/messagebus/rabbitmqsub.go new file mode 100644 index 0000000..bf59609 --- /dev/null +++ b/messagebus/rabbitmqsub.go @@ -0,0 +1,106 @@ +package messagebus + +import ( + "sync" + "time" + + "github.com/CyCoreSystems/ari/v5/rid" + "github.com/rabbitmq/amqp091-go" +) + +// RmqSubscription handle RabbitMQ subscription +type RmqSubscription struct { + consumerID string + channel *amqp091.Channel + mu sync.RWMutex + + Topics []string + Queue string + Exchange string + ExchangeKind string + QueueArgs amqp091.Table +} + +// Unsubscribe remove the subscription +func (rs *RmqSubscription) Unsubscribe() error { + rs.mu.RLock() + defer rs.mu.RUnlock() + if rs.consumerID == "" { + return nil + } + err := rs.channel.Cancel(rs.consumerID, false) + if err != nil { + return err + } + return rs.channel.Close() +} + +// reconnect reconnects the subscription +func (rs *RmqSubscription) reconnect(r *RabbitmqBus) <-chan amqp091.Delivery { + rs.mu.Lock() + + if rs.channel != nil { + rs.channel.Close() // nolint: errcheck + } + rs.mu.Unlock() + + for { + msgs, err := rs.execute(r) + if err != nil { + r.Log.Error("failed to execute subscription", "error", err) + time.Sleep(DefaultReconnectionWait) + continue + } + return msgs + } +} + +// execute declares the subscription on RabbitMQ +func (rs *RmqSubscription) execute(r *RabbitmqBus) (<-chan amqp091.Delivery, error) { + rs.mu.Lock() + defer rs.mu.Unlock() + + ch, err := r.newChannel() + if err != nil { + return nil, err + } + + rs.channel = ch + queue, err := ch.QueueDeclare( + rs.Queue, // name of queue + true, // durable + false, // delete when unused + false, // exclusive + false, // nowait + rs.QueueArgs, // arguments + ) + if err != nil { + return nil, err + } + + err = ch.ExchangeDeclare( + rs.Exchange, // name of exchange + rs.ExchangeKind, // kind + true, // durable + false, // delete when unused + false, // internal + false, // nowait + nil, // arguments + ) + if err != nil { + return nil, err + } + if queue.Name != rs.Queue { + rs.Queue = queue.Name + } + + for _, topic := range rs.Topics { + err = ch.QueueBind(queue.Name, topic, rs.Exchange, false, nil) + if err != nil { + return nil, err + } + } + + rs.consumerID = rid.New(ridConsumer) + return ch.Consume(rs.Queue, rs.consumerID, false, false, true, true, nil) +} diff --git a/messagebus/response_forwarder.go b/messagebus/response_forwarder.go new file mode 100644 index 0000000..42e8b82 --- /dev/null +++ b/messagebus/response_forwarder.go @@ -0,0 +1,39 @@ +package messagebus + +import ( + "sync" + + "github.com/CyCoreSystems/ari-proxy/v5/proxy" +) + +type responseForwarder struct { + closed bool + count int + expected int + fwdChan chan *proxy.Response + + mu sync.Mutex +} + +func (f *responseForwarder) Forward(o *proxy.Response) { + + f.mu.Lock() + defer f.mu.Unlock() + + f.count++ + + if f.closed { + return + } + + // always send up reply, so we can track errors. + select { + case f.fwdChan <- o: + default: + } + + if f.count >= f.expected { + f.closed = true + close(f.fwdChan) + } +} diff --git a/proxy/types.go b/proxy/types.go index 5e51dd5..59ef99f 100644 --- a/proxy/types.go +++ b/proxy/types.go @@ -23,12 +23,12 @@ type Announcement struct { Application string `json:"application"` } -// AnnouncementSubject returns the NATS subject +// AnnouncementSubject returns the MessageBus subject func AnnouncementSubject(prefix string) string { return fmt.Sprintf("%sannounce", prefix) } -// PingSubject returns the NATS subject for a cluster-wide proxy ping for presence +// PingSubject returns the MessageBus subject for a cluster-wide proxy ping for presence func PingSubject(prefix string) string { return fmt.Sprintf("%sping", prefix) } diff --git a/server/clientserver_test.go b/server/clientserver_test.go index ac9e7d8..5f0cb72 100644 --- a/server/clientserver_test.go +++ b/server/clientserver_test.go @@ -20,7 +20,7 @@ type srv struct { func (s *srv) Start(ctx context.Context, t *testing.T, mockClient ari.Client, nc *nats.EncodedConn, completeCh chan struct{}) (ari.Client, error) { s.s = New() // tests may run in parallel so we don't want two separate proxy servers to conflict. - s.s.NATSPrefix = rid.New("") + "." + s.s.MBPrefix = rid.New("") + "." s.s.Application = "asdf" go func() { @@ -38,7 +38,7 @@ func (s *srv) Start(ctx context.Context, t *testing.T, mockClient ari.Client, nc return nil, errors.New("Timeout waiting for server ready") } - cl, err := client.New(ctx, client.WithTimeoutRetries(4), client.WithPrefix(s.s.NATSPrefix), client.WithApplication("asdf")) + cl, err := client.New(ctx, client.WithTimeoutRetries(4), client.WithPrefix(s.s.MBPrefix), client.WithApplication("asdf")) if err != nil { return nil, err } diff --git a/server/handler.go b/server/handler.go index 6d2c8c8..5f0d4c0 100644 --- a/server/handler.go +++ b/server/handler.go @@ -6,7 +6,7 @@ import "github.com/CyCoreSystems/ari-proxy/v5/session" // response object or error. type Reply func(interface{}, error) -// A Handler2 is a function which provides a session-aware request-response for nats +// A Handler2 is a function which provides a session-aware request-response for messagebus type Handler2 func(msg *session.Message, reply Reply) // Handler is left for compat diff --git a/server/liveRecording.go b/server/liveRecording.go index 1007bdf..d4cd755 100644 --- a/server/liveRecording.go +++ b/server/liveRecording.go @@ -13,7 +13,7 @@ func (s *Server) recordingLiveData(ctx context.Context, reply string, req *proxy return } - s.publish(reply, proxy.Response{ + s.publish(reply, &proxy.Response{ Data: &proxy.EntityData{ LiveRecording: data, }, @@ -27,7 +27,7 @@ func (s *Server) recordingLiveGet(ctx context.Context, reply string, req *proxy. return } - s.publish(reply, proxy.Response{ + s.publish(reply, &proxy.Response{ Key: data.Key, }) } diff --git a/server/options.go b/server/options.go index 086ce52..5343aef 100644 --- a/server/options.go +++ b/server/options.go @@ -8,7 +8,7 @@ import ( // Options are the group of options for the ari-proxy server type Options struct { - //TODO: include nats options + //TODO: include nats/rabbitmq options URL string diff --git a/server/server.go b/server/server.go index b1e9044..d016673 100644 --- a/server/server.go +++ b/server/server.go @@ -2,28 +2,22 @@ package server import ( "context" + "errors" "fmt" "os" "time" + "github.com/CyCoreSystems/ari-proxy/v5/messagebus" "github.com/CyCoreSystems/ari-proxy/v5/proxy" "github.com/CyCoreSystems/ari-proxy/v5/server/dialog" "github.com/CyCoreSystems/ari/v5" "github.com/CyCoreSystems/ari/v5/client/native" + "github.com/nats-io/nats.go" "github.com/rotisserie/eris" "github.com/inconshreveable/log15" - "github.com/nats-io/nats.go" ) -// DefaultReconnectionAttemts is the default number of reconnection attempts -// It implements a hard coded fault tolerance for a starting NATS cluster -const DefaultNATSReconnectionAttemts = 5 - -// DefaultNATSReconnectionWait is the default wating time between each reconnection -// attempt -const DefaultNATSReconnectionWait = 5 * time.Second - // Server describes the asterisk-facing ARI proxy server type Server struct { // Application is the name of the ARI application of this server @@ -33,15 +27,12 @@ type Server struct { // to which this server is connected. AsteriskID string - // NATSPrefix is the string which should be prepended to all NATS subjects, sending and receiving. It defaults to "ari.". - NATSPrefix string + // MBPrefix is the string which should be prepended to all MessageBus subjects, sending and receiving. It defaults to "ari.". + MBPrefix string // ari is the native Asterisk ARI client by which this proxy is directly connected ari ari.Client - // nats is the JSON-encoded NATS connection - nats *nats.EncodedConn - // Dialog is the dialog manager Dialog dialog.Manager @@ -52,6 +43,8 @@ type Server struct { // Log is the log15.Logger for the service. You may replace or call SetHandler() on this at any time to change the logging of the service. Log log15.Logger + + mbus messagebus.Server } // New returns a new Server @@ -60,15 +53,15 @@ func New() *Server { log.SetHandler(log15.DiscardHandler()) return &Server{ - NATSPrefix: "ari.", - readyCh: make(chan struct{}), - Dialog: dialog.NewMemManager(), - Log: log, + MBPrefix: "ari.", + readyCh: make(chan struct{}), + Dialog: dialog.NewMemManager(), + Log: log, } } -// Listen runs the given server, listening to ARI and NATS, as specified -func (s *Server) Listen(ctx context.Context, ariOpts *native.Options, natsURI string) (err error) { +// Listen runs the given server, listening to ARI and MessageBus, as specified +func (s *Server) Listen(ctx context.Context, ariOpts *native.Options, messagebusURL string) (err error) { ctx, cancel := context.WithCancel(ctx) s.cancel = cancel @@ -79,23 +72,28 @@ func (s *Server) Listen(ctx context.Context, ariOpts *native.Options, natsURI st } defer s.ari.Close() - // Connect to NATS - nc, err := nats.Connect(natsURI) - reconnectionAttempts := DefaultNATSReconnectionAttemts - for err == nats.ErrNoServers && reconnectionAttempts > 0 { - s.Log.Info("retrying to connect to NATS server", "attempts", reconnectionAttempts) - time.Sleep(DefaultNATSReconnectionWait) - nc, err = nats.Connect(natsURI) - reconnectionAttempts -= 1 - } - if err != nil { - return eris.Wrap(err, "failed to connect to NATS") + mbtype := messagebus.GetType(messagebusURL) + switch mbtype { + case messagebus.TypeRabbitmq: + s.mbus = &messagebus.RabbitmqBus{ + Config: messagebus.Config{URL: messagebusURL}, + Log: s.Log, + } + case messagebus.TypeNats: + s.mbus = &messagebus.NatsBus{ + Config: messagebus.Config{URL: messagebusURL}, + Log: s.Log, + } + default: + return errors.New("Unkwnon url for MessageBus: " + messagebusURL) } - s.nats, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER) + + // Connect to MessageBus + err = s.mbus.Connect() if err != nil { - return eris.Wrap(err, "failed to encode NATS connection") + return eris.Wrap(err, "failed to connect to MessageBus") } - defer s.nats.Close() + defer s.mbus.Close() return s.listen(ctx) } @@ -106,7 +104,10 @@ func (s *Server) ListenOn(ctx context.Context, a ari.Client, n *nats.EncodedConn s.cancel = cancel s.ari = a - s.nats = n + s.mbus = messagebus.NewNatsBus( + messagebus.Config{}, + messagebus.WithNatsConn(n), + ) return s.listen(ctx) } @@ -148,83 +149,50 @@ func (s *Server) listen(ctx context.Context) error { s.Application = s.ari.ApplicationName() // - // Listen on the initial NATS subjects + // Listen on the initial MessageBus subjects // // ping handler - pingSub, err := s.nats.Subscribe(proxy.PingSubject(s.NATSPrefix), s.pingHandler) + testPingSub, err := s.mbus.SubscribePing(proxy.PingSubject(s.MBPrefix), s.pingHandler) if err != nil { return eris.Wrap(err, "failed to subscribe to pings") } - defer wg.Add(pingSub.Unsubscribe) + defer wg.Add(testPingSub.Unsubscribe) // get a contextualized request handler requestHandler := s.newRequestHandler(ctx) - // get handlers - allGet, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "get", "", ""), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create get-all subscription") - } - defer wg.Add(allGet.Unsubscribe)() - - appGet, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "get", s.Application, ""), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create get-app subscription") - } - defer wg.Add(appGet.Unsubscribe)() - idGet, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "get", s.Application, s.AsteriskID), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create get-id subscription") - } - defer wg.Add(idGet.Unsubscribe)() - - // data handlers - allData, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "data", "", ""), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create data-all subscription") - } - defer wg.Add(allData.Unsubscribe)() - appData, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "data", s.Application, ""), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create data-app subscription") - } - defer wg.Add(appData.Unsubscribe)() - idData, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "data", s.Application, s.AsteriskID), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create data-id subscription") - } - defer wg.Add(idData.Unsubscribe)() - - // command handlers - allCommand, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "command", "", ""), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create command-all subscription") - } - defer wg.Add(allCommand.Unsubscribe)() - appCommand, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "command", s.Application, ""), requestHandler) - if err != nil { - return eris.Wrap(err, "failed to create command-app subscription") + subjects := []string{ + proxy.Subject(s.MBPrefix, "get", "", ""), + proxy.Subject(s.MBPrefix, "get", s.Application, ""), + proxy.Subject(s.MBPrefix, "get", s.Application, s.AsteriskID), + proxy.Subject(s.MBPrefix, "data", "", ""), + proxy.Subject(s.MBPrefix, "data", s.Application, ""), + proxy.Subject(s.MBPrefix, "data", s.Application, s.AsteriskID), + proxy.Subject(s.MBPrefix, "command", "", ""), + proxy.Subject(s.MBPrefix, "command", s.Application, ""), + proxy.Subject(s.MBPrefix, "command", s.Application, s.AsteriskID), } - defer wg.Add(appCommand.Unsubscribe)() - idCommand, err := s.nats.Subscribe(proxy.Subject(s.NATSPrefix, "command", s.Application, s.AsteriskID), requestHandler) + // get / data / command handlers + requestsSub, err := s.mbus.SubscribeRequests(subjects, requestHandler) if err != nil { - return eris.Wrap(err, "failed to create command-id subscription") + s.Log.Error("%v", err) + return eris.Wrap(err, "failed to create requests subscription") } - defer wg.Add(idCommand.Unsubscribe)() + defer wg.Add(requestsSub.Unsubscribe)() // create handlers - allCreate, err := s.nats.QueueSubscribe(proxy.Subject(s.NATSPrefix, "create", "", ""), "ariproxy", requestHandler) + allCreate, err := s.mbus.SubscribeCreateRequest(proxy.Subject(s.MBPrefix, "create", "", ""), "ariproxy", requestHandler) if err != nil { return eris.Wrap(err, "failed to create create-all subscription") } defer wg.Add(allCreate.Unsubscribe)() - appCreate, err := s.nats.QueueSubscribe(proxy.Subject(s.NATSPrefix, "create", s.Application, ""), "ariproxy", requestHandler) + appCreate, err := s.mbus.SubscribeCreateRequest(proxy.Subject(s.MBPrefix, "create", s.Application, ""), "ariproxy", requestHandler) if err != nil { return eris.Wrap(err, "failed to create create-app subscription") } defer wg.Add(appCreate.Unsubscribe)() - idCreate, err := s.nats.QueueSubscribe(proxy.Subject(s.NATSPrefix, "create", s.Application, s.AsteriskID), "ariproxy", requestHandler) + idCreate, err := s.mbus.SubscribeCreateRequest(proxy.Subject(s.MBPrefix, "create", s.Application, s.AsteriskID), "ariproxy", requestHandler) if err != nil { return eris.Wrap(err, "failed to create create-id subscription") } @@ -293,7 +261,7 @@ func (s *Server) runAnnouncer(ctx context.Context) { // announce publishes the presence of this server to the cluster func (s *Server) announce() { - s.publish(proxy.AnnouncementSubject(s.NATSPrefix), &proxy.Announcement{ + s.publishAnnounce(proxy.AnnouncementSubject(s.MBPrefix), &proxy.Announcement{ Node: s.AsteriskID, Application: s.Application, }) @@ -313,33 +281,47 @@ func (s *Server) runEventHandler(ctx context.Context) { s.Log.Debug("event received", "kind", e.GetType()) // Publish event to canonical destination - s.publish(fmt.Sprintf("%sevent.%s.%s", s.NATSPrefix, s.Application, s.AsteriskID), e) + s.publishEvent(fmt.Sprintf("%sevent.%s.%s", s.MBPrefix, s.Application, s.AsteriskID), e) // Publish event to any associated dialogs for _, d := range s.dialogsForEvent(e) { de := e de.SetDialog(d) - s.publish(fmt.Sprintf("%sdialogevent.%s", s.NATSPrefix, d), de) + s.publishEvent(fmt.Sprintf("%sdialogevent.%s", s.MBPrefix, d), de) } } } } // pingHandler publishes the server's presence -func (s *Server) pingHandler(m *nats.Msg) { +func (s *Server) pingHandler() { if s.ari.Connected() { s.announce() } } -// publish sends a message out over NATS, logging any error -func (s *Server) publish(subject string, msg interface{}) { - if err := s.nats.Publish(subject, msg); err != nil { - s.Log.Warn("failed to publish NATS message", "subject", subject, "data", msg, "error", err) +// publish sends a message out over MessageBus, logging any error +func (s *Server) publish(subject string, msg *proxy.Response) { + if err := s.mbus.PublishResponse(subject, msg); err != nil { + s.Log.Warn("failed to publish MessageBus message", "subject", subject, "data", msg, "error", err) + } +} + +// publishAnnounce sends a message out over MessageBus, logging any error +func (s *Server) publishAnnounce(subject string, msg *proxy.Announcement) { + if err := s.mbus.PublishAnnounce(subject, msg); err != nil { + s.Log.Warn("failed to publish MessageBus message", "subject", subject, "data", msg, "error", err) + } +} + +// publishEvent sends a message out over MessageBus, logging any error +func (s *Server) publishEvent(subject string, msg ari.Event) { + if err := s.mbus.PublishEvent(subject, msg); err != nil { + s.Log.Warn("failed to publish MessageBus message", "subject", subject, "data", msg, "error", err) } } -// newRequestHandler returns a context-wrapped nats.Handler to handle requests +// newRequestHandler returns a context-wrapped Handler to handle requests func (s *Server) newRequestHandler(ctx context.Context) func(subject string, reply string, req *proxy.Request) { return func(subject string, reply string, req *proxy.Request) { if !s.ari.Connected() {