Skip to content

Commit

Permalink
Add RabbitMQ support (#44)
Browse files Browse the repository at this point in the history
feat: add RabbitMQ support

Co-authored-by: Seán C McCord <[email protected]>
  • Loading branch information
jose-lopes and Ulexus committed Oct 21, 2022
1 parent 4561b2b commit d16005d
Show file tree
Hide file tree
Showing 21 changed files with 1,509 additions and 341 deletions.
53 changes: 37 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ Proxy for the Asterisk REST interface (ARI).
The ARI proxy facilitates scaling of both applications and Asterisk,
independently and with minimal coordination. Each Asterisk instance and ARI
application pair runs an `ari-proxy` server instance, which talks to a common
NATS cluster. Each client application talks to the same NATS cluster. The
NATS or RabbitMQ cluster. Each client application talks to the same message bus. The
clients automatically and continuously discover new Asterisk instances, so the
only coordination needed is the common location of the NATS cluster.
only coordination needed is the common location of the message bus.

The ARI proxy allows for:
- Any number of applications running the ARI client
Expand All @@ -22,13 +22,17 @@ The ARI proxy allows for:
- Simple call event reception by any number of application clients. (No
single-app lockout)

Supported message buses:
- [NATS](https://nats.io)
- [RabbitMQ](https://rabbitmq.com)

## Proxy server


Docker images are kept up to date with releases and are tagged accordingly. The
`ari-proxy` does not expose any services, so no ports need to be opened for it.
However, it does need to know how to connect to both Asterisk and NATS.
However, it does need to know how to connect to both Asterisk and the message
bus.

```
docker run \
Expand All @@ -37,7 +41,7 @@ However, it does need to know how to connect to both Asterisk and NATS.
-e ARI_PASSWORD="supersecret" \
-e ARI_HTTP_URL="http://asterisk:8088/ari" \
-e ARI_WEBSOCKET_URL="ws://asterisk:8088/ari/events" \
-e NATS_URL="nats://nats:4222" \
-e MESSAGEBUS_URL="nats://nats:4222" \
cycoresystems/ari-proxy
```

Expand Down Expand Up @@ -74,9 +78,25 @@ func connect(ctx context.Context, appName string) (ari.Client,error) {
}
```

Connecting the client to RabbitMQ is like:

```go
import (
"github.com/CyCoreSystems/ari/v5"
"github.com/CyCoreSystems/ari-proxy/v5/client"
)

func connect(ctx context.Context, appName string) (ari.Client,error) {
c, err := client.New(ctx,
client.WithApplication(appName),
client.WithURI("amqp://user:password@rabbitmqhost:5679/"),
)
}
```

Configuration of the client can also be done with environment variables.
`ARI_APPLICATION` can be used to set the ARI application name, and `NATS_URI`
can be used to set the NATS URI. Doing so allows you to get a client connection
`ARI_APPLICATION` can be used to set the ARI application name, and `MESSAGEBUS_URL`
can be used to set the message bus URL. Doing so allows you to get a client connection
simply with `client.New(ctx)`.

Once an `ari.Client` is obtained, the client functions exactly as the native
Expand All @@ -97,14 +117,14 @@ open subscriptions on the client.

Layers of clients can be used efficiently with different contexts using the
`New(context.Context)` function of each client instance. Subtended clients will
be closed with their parents, use a common internal NATS connection, and can be
be closed with their parents, use a common internal message bus connection, and can be
severally closed by their individual contexts. This makes managing many active
channels easy and efficient.

### Lifecycle

There are two levels of client in use. The first is a connection, which is a
long-lived network connection to the NATS cluster. In general, the end user
long-lived network connection to the message bus. In general, the end user
should not close this connection. However, it is available, if necessary, as
`DefaultConn` and offers a `Close()` function for itself.

Expand All @@ -127,15 +147,15 @@ of where the client is located. These pieces of information are handled
transparently and internally by the ARI proxy and the ARI proxy client to route
commands and events where they should be sent.

### NATS protocol details
### Message bus protocol details

The protocol details described below are only necessary to know if you do not use the
provided client and/or server. By using both components in this repository, the
protocol details below are transparently handled for you.

#### Subject structure

The NATS subject prefix defaults to `ari.`, and all messages used by this proxy
The message bus subject prefix defaults to `ari.`, and all messages used by this proxy
will be prefixed by that term.

Next is added one of four resource classifications:
Expand Down Expand Up @@ -165,12 +185,13 @@ ARI application command subject. In fact, each ARI proxy listens to each of the
three levels. A request to `ari.command` will result in all ARI proxies
responding.)

This setup allows for a variable generalization in the listeners by using NATS
This setup allows for a variable generalization in the listeners by using
message bus
wildcard subscriptions. For instance, if you want to receive all events for the
"test" application regardless from which Asterisk machine they come, you would
subscribe to:
"test" application regardless from which Asterisk machine they come, you would subscribe to:

`ari.event.test.>`
`ari.event.test.>` //NATS
`ari.event.test.#` //RabbitMQ

#### Dialogs

Expand All @@ -179,14 +200,14 @@ specifies a Dialog ID in its metadata, the ARI proxy will further classify
events related to that dialog. Relationships are defined by the entity type on
which the Dialog-infused command operates.

Dialog-related events are published on their own NATS subject tree,
Dialog-related events are published on their own message bus subject tree,
`dialogevent`. Thus dialogs abstract ARI application and Asterisk ID. An event
for dialog "testme123" would be published to:

`ari.dialogevent.testme123`

Keep in mind that regardless of dialog associations, all events are _also_
published to their appropriate canonical NATS subjects. Dialogs are intended as
published to their appropriate canonical message bus subjects. Dialogs are intended as
a mechanism to:

- reduce client message traffic load
Expand Down
2 changes: 1 addition & 1 deletion ari-proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ ari:
application: example
http_url: http://localhost:8088/ari
websocket_url: ws://localhost:8088/ari/events
nats:
messagebus:
url: nats://nats:4222
46 changes: 28 additions & 18 deletions client/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,40 @@ import (
"fmt"
"sync"

"github.com/CyCoreSystems/ari-proxy/v5/messagebus"
"github.com/CyCoreSystems/ari/v5"
"github.com/inconshreveable/log15"

"github.com/nats-io/nats.go"
)

// EventChanBufferLength is the number of unhandled events which can be queued
// to the event channel buffer before further events are lost.
var EventChanBufferLength = 10

// Bus provides an ari.Bus interface to NATS
// Bus provides an ari.Bus interface to MessageBus
type Bus struct {
prefix string

log log15.Logger

nc *nats.EncodedConn
mbus messagebus.Client
}

// New returns a new Bus
func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus {
func New(prefix string, m messagebus.Client, log log15.Logger) *Bus {
return &Bus{
prefix: prefix,
log: log,
nc: nc,
mbus: m,
}
}

func (b *Bus) subjectFromKey(key *ari.Key) string {
if key == nil {
return fmt.Sprintf("%sevent.>", b.prefix)
return fmt.Sprintf(
"%sevent.%s",
b.prefix,
b.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords),
)
}

if key.Dialog != "" {
Expand All @@ -43,23 +46,23 @@ func (b *Bus) subjectFromKey(key *ari.Key) string {

subj := fmt.Sprintf("%sevent.", b.prefix)
if key.App == "" {
return subj + ">"
return subj + b.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords)
}
subj += key.App + "."

if key.Node == "" {
return subj + ">"
return subj + b.mbus.GetWildcardString(messagebus.WildcardZeroOrMoreWords)
}
return subj + key.Node
}

// Subscription represents an ari.Subscription over NATS
// Subscription represents an ari.Subscription over MessageBus
type Subscription struct {
key *ari.Key

log log15.Logger

subscription *nats.Subscription
subscription messagebus.Subscription

eventChan chan ari.Event

Expand Down Expand Up @@ -91,11 +94,18 @@ func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription {
events: n,
}

s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) {
s.receive(m)
})
var app string
if key != nil {
app = key.App
}

s.subscription, err = b.mbus.SubscribeEvent(
b.subjectFromKey(key),
app,
s.receive,
)
if err != nil {
b.log.Error("failed to subscribe to NATS", "error", err)
b.log.Error("failed to subscribe to MessageBus", "error", err)
return nil
}
return s
Expand All @@ -115,7 +125,7 @@ func (s *Subscription) Cancel() {
if s.subscription != nil {
err := s.subscription.Unsubscribe()
if err != nil {
s.log.Error("failed unsubscribe from NATS", "error", err)
s.log.Error("failed unsubscribe from MessageBus", "error", err)
}
}

Expand All @@ -127,8 +137,8 @@ func (s *Subscription) Cancel() {
s.mu.Unlock()
}

func (s *Subscription) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
func (s *Subscription) receive(data []byte) {
e, err := ari.DecodeEvent(data)
if err != nil {
s.log.Error("failed to convert received message to ari.Event", "error", err)
return
Expand Down
Loading

0 comments on commit d16005d

Please sign in to comment.