Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(service): add mqtt service #149

Closed
50 changes: 50 additions & 0 deletions docs/services/mqtt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# MQTT

## URL Format

_mqtt://**`host`**:**`port`**?topic=**`topic`**_

## Optional parameters

You can optionally specify the **`disableTLS`**, **`clientID`**, **`username`** and **`password`** parameters in the URL:
_mqtt://**`host`**:**`port`**?topic=**`topic`**&disableTLS=true&clientID=**`clientID`**&username=**`username`**&password:**`password`**_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(URL creds)

Suggested change
_mqtt://**`host`**:**`port`**?topic=**`topic`**&disableTLS=true&clientID=**`clientID`**&username=**`username`**&password:**`password`**_
_mqtt://**`username`**:**`password`**@**`host`**:**`port`**?topic=**`topic`**&disableTLS=true&clientID=**`clientID`**&username=**`username`**&password:**`password`**_


## Parameters Description

- **Host** - MQTT broker server hostname or IP address (**Required**)
Default: _empty_
Aliases: `host`

- **Port** - MQTT server port, common ones are 8883 for TCP/TLS and 1883 for TCP (**Required**)
Default: `8883`

- **Topic** - Topic where the message is sent (**Required**)
Default: _empty_
Aliases: `Topic`

- **DisableTLS** - disable TLS/SSL Configurations
Default: `false`

- **ClientID** - The client identifier (ClientID) identifies each MQTT client that connects to an MQTT
Default: _empty_
Aliases: `clientID`

- **Username** - name of the sender to auth
Default: _empty_
Aliases: `clientID`

- **Password** - authentication password or hash
Default: _empty_
Aliases: `password`
Comment on lines +12 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can auto-generated from the config tags

Suggested change
## Parameters Description
- **Host** - MQTT broker server hostname or IP address (**Required**)
Default: _empty_
Aliases: `host`
- **Port** - MQTT server port, common ones are 8883 for TCP/TLS and 1883 for TCP (**Required**)
Default: `8883`
- **Topic** - Topic where the message is sent (**Required**)
Default: _empty_
Aliases: `Topic`
- **DisableTLS** - disable TLS/SSL Configurations
Default: `false`
- **ClientID** - The client identifier (ClientID) identifies each MQTT client that connects to an MQTT
Default: _empty_
Aliases: `clientID`
- **Username** - name of the sender to auth
Default: _empty_
Aliases: `clientID`
- **Password** - authentication password or hash
Default: _empty_
Aliases: `password`
--8<-- "docs/services/mqtt/config.md"


## Certificates to use TCP/TLS

To use TCP/TLS connection, it is necessary the files:

- Cerficate Authority: ca.crt
- Client Certificate: client.crt
- Client Key: client.key

## Configure TLS in mosquitto

Generate the certificates [mosquitto-tls](https://mosquitto.org/man/mosquitto-tls-7.html).
1 change: 1 addition & 0 deletions docs/services/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Click on the service for a more thorough explanation. <!-- @formatter:off -->
| [IFTTT](./ifttt.md) | *ifttt://__`key`__/?events=__`event1`__[,__`event2`__,...]&value1=__`value1`__&value2=__`value2`__&value3=__`value3`__* |
| [Join](./join.md) | *join://shoutrrr:__`api-key`__@join/?devices=__`device1`__[,__`device2`__, ...][&icon=__`icon`__][&title=__`title`__]* |
| [Mattermost](./mattermost.md) | *mattermost://[__`username`__@]__`mattermost-host`__/__`token`__[/__`channel`__]* |
| [MQTT](./mqtt.md) | *mqtt://__`host`__:__`port`__?topic=__`topic`__*
| [OpsGenie](./opsgenie.md) | *opsgenie://__`host`__/token?responders=__`responder1`__[,__`responder2`__]* |
| [Pushbullet](./pushbullet.md) | *pushbullet://__`api-token`__[/__`device`__/#__`channel`__/__`email`__]* |
| [Pushover](./pushover.md) | *pushover://shoutrrr:__`apiToken`__@__`userKey`__/?devices=__`device1`__[,__`device2`__, ...]* |
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/containrrr/shoutrrr
go 1.12

require (
github.com/eclipse/paho.mqtt.golang v1.3.2 // indirect
github.com/fatih/color v1.10.0
github.com/google/uuid v1.1.5 // indirect
github.com/jarcoal/httpmock v1.0.4
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/eclipse/paho.mqtt.golang v1.3.2 h1:ICzfxSyrR8bOsh9l8JBBOwO1tc2C26oEyody0ml0L6E=
github.com/eclipse/paho.mqtt.golang v1.3.2/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
Expand Down Expand Up @@ -102,6 +104,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
Expand Down Expand Up @@ -277,6 +281,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
2 changes: 2 additions & 0 deletions pkg/router/servicemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/containrrr/shoutrrr/pkg/services/logger"
"github.com/containrrr/shoutrrr/pkg/services/matrix"
"github.com/containrrr/shoutrrr/pkg/services/mattermost"
"github.com/containrrr/shoutrrr/pkg/services/mqtt"
"github.com/containrrr/shoutrrr/pkg/services/opsgenie"
"github.com/containrrr/shoutrrr/pkg/services/pushbullet"
"github.com/containrrr/shoutrrr/pkg/services/pushover"
Expand All @@ -33,6 +34,7 @@ var serviceMap = map[string]func() t.Service{
"logger": func() t.Service { return &logger.Service{} },
"matrix": func() t.Service { return &matrix.Service{} },
"mattermost": func() t.Service { return &mattermost.Service{} },
"mqtt": func() t.Service { return &mqtt.Service{} },
"opsgenie": func() t.Service { return &opsgenie.Service{} },
"pushbullet": func() t.Service { return &pushbullet.Service{} },
"pushover": func() t.Service { return &pushover.Service{} },
Expand Down
98 changes: 98 additions & 0 deletions pkg/services/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package mqtt

import (
"fmt"
"log"
"net/url"

"github.com/containrrr/shoutrrr/pkg/format"
"github.com/containrrr/shoutrrr/pkg/util"
mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/containrrr/shoutrrr/pkg/services/standard"
"github.com/containrrr/shoutrrr/pkg/types"
)

const (
maxLength = 268435455
)

// Service sends notifications to mqtt topic
type Service struct {
standard.Standard
config *Config
pkr format.PropKeyResolver
}

// Send notification to mqtt
func (service *Service) Send(message string, params *types.Params) error {

message, omitted := MessageLimit(message)

if omitted > 0 {
service.Logf("omitted %v character(s) from the message", omitted)
}

config := *service.config
if err := service.pkr.UpdateConfigFromParams(&config, params); err != nil {
return err
}

if err := service.PublishMessageToTopic(message, &config); err != nil {
return fmt.Errorf("an error occurred while sending notification to the MQTT topic: %s", err.Error())
}

return nil
}

// Initialize loads ServiceConfig from configURL and sets logger for this Service
func (service *Service) Initialize(configURL *url.URL, logger *log.Logger) error {
service.Logger.SetLogger(logger)
service.config = &Config{
DisableTLS: false,
Port: 8883,
}
service.pkr = format.NewPropKeyResolver(service.config)
err := service.config.setURL(&service.pkr, configURL)

return err
}

// MessageLimit returns a string with the maximum size and the amount of omitted characters
func MessageLimit(message string) (string, int) {
size := util.Min(maxLength, len(message))
omitted := len(message) - size

return message[:size], omitted
}

// GetConfig returns the Config for the service
func (service *Service) GetConfig() *Config {
return service.config
}

// Publish to topic
func (service *Service) Publish(client mqtt.Client, topic string, message string) {
token := client.Publish(topic, 0, false, message)
token.Wait()
}

// PublishMessageToTopic initializes the client and publishes the message
func (service *Service) PublishMessageToTopic(message string, config *Config) error {
postURL := config.MqttURL()
opts := config.GetClientConfig(postURL)
client := mqtt.NewClient(opts)
token := client.Connect()

if token.Error() != nil {
return token.Error()
}

token.Wait()

service.Publish(client, config.Topic, message)

client.Disconnect(250)

return nil
}
146 changes: 146 additions & 0 deletions pkg/services/mqtt/mqtt_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package mqtt

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/url"
"strconv"

"github.com/containrrr/shoutrrr/pkg/format"
"github.com/containrrr/shoutrrr/pkg/types"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(URL creds)

Suggested change
"github.com/containrrr/shoutrrr/pkg/types"
"github.com/containrrr/shoutrrr/pkg/types"
"github.com/containrrr/shoutrrr/pkg/util"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

// Config for use within the mqtt
type Config struct {
Host string `key:"host" default:"" desc:"MQTT broker server hostname or IP address"`
Port uint16 `key:"port" default:"8883" desc:"MQTT server port, common ones are 8883, 1883"`
Topic string `key:"topic" default:"" desc:"Topic where the message is sent"`
ClientID string `key:"clientid" default:"" desc:"client's id from the message is sent"`
Username string `key:"username" default:"" desc:"username for auth"`
Password string `key:"password" default:"" desc:"password for auth"`
Comment on lines +23 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(URL creds)

Suggested change
Username string `key:"username" default:"" desc:"username for auth"`
Password string `key:"password" default:"" desc:"password for auth"`
Username string `key:"username" default:"" desc:"username for auth" url:"User"`
Password string `key:"password" default:"" desc:"password for auth" url:"Pass"`

DisableTLS bool `key:"disabletls" default:"No"`
}

// DefaultConfig creates a PropKeyResolver and uses it to populate the default values of a new Config, returning both
func DefaultConfig() (*Config, format.PropKeyResolver) {
config := &Config{}
pkr := format.NewPropKeyResolver(config)
_ = pkr.SetDefaultProps(config)
return config, pkr
}

// Enums returns the fields that should use a corresponding EnumFormatter to Print/Parse their values
func (config *Config) Enums() map[string]types.EnumFormatter {
return map[string]types.EnumFormatter{}
}

// GetURL returns a URL representation of it's current field values
func (config *Config) GetURL() *url.URL {
resolver := format.NewPropKeyResolver(config)
return config.getURL(&resolver)
}

// SetURL updates a ServiceConfig from a URL representation of it's field values
func (config *Config) SetURL(url *url.URL) error {
resolver := format.NewPropKeyResolver(config)
return config.setURL(&resolver, url)
}

func (config *Config) getURL(resolver types.ConfigQueryResolver) *url.URL {

return &url.URL{
Host: fmt.Sprintf("%s:%d", config.Host, config.Port),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(URL creds)

Suggested change
Host: fmt.Sprintf("%s:%d", config.Host, config.Port),
User: util.URLUserPassword(config.Username, config.Password),
Host: fmt.Sprintf("%s:%d", config.Host, config.Port),

Scheme: Scheme,
ForceQuery: true,
RawQuery: format.BuildQuery(resolver),
}

}

func (config *Config) setURL(resolver types.ConfigQueryResolver, url *url.URL) error {

config.Host = url.Hostname()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(URL creds)

Suggested change
config.Host = url.Hostname()
config.Host = url.Hostname()
config.Username = url.User.Username()
password, _ := url.User.Password()
config.Password = password


if port, err := strconv.ParseUint(url.Port(), 10, 16); err == nil {
config.Port = uint16(port)
}

for key, vals := range url.Query() {
if err := resolver.Set(key, vals[0]); err != nil {
return err
}
}

return nil
}

// MqttURL returns a string that is synchronized with the config props
func (config *Config) MqttURL() string {
MqttHost := config.Host
MqttPort := config.Port
scheme := DefaultMQTTScheme
if config.DisableTLS {
scheme = Scheme[:4]
}
return fmt.Sprintf("%s://%s:%d", scheme, MqttHost, MqttPort)
}

// GetClientConfig returns the client options
func (config *Config) GetClientConfig(postURL string) *mqtt.ClientOptions {
opts := mqtt.NewClientOptions()

opts.AddBroker(postURL)

if len(config.ClientID) > 0 {
opts.SetClientID(config.ClientID)
}

if len(config.Username) > 0 {
opts.SetUsername(config.Username)
}

if len(config.Password) > 0 {
opts.SetPassword(config.Password)
}

if !config.DisableTLS {
tlsConfig := config.GetTLSConfig()
opts.SetTLSConfig(tlsConfig)
}

return opts
}

// GetTLSConfig returns the configuration with the certificates for TLS
func (config *Config) GetTLSConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.crt")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring the files to have this specific name and be in the working directory of the consuming app makes this really hard to use. Ideally, we should add some kind of generic way to add files/blobs to services (mostly for TLS, so perhaps even a specific TLS-cert interface...)
Right now, this should at least be configurable in the config struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in #185


if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)

clientKeyPair, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of loading custom certs when verification is disabled? This should absolutely not be the default.

Certificates: []tls.Certificate{clientKeyPair},
}
}

const (
// Scheme is the identifying part of this service's configuration URL
Scheme = "mqtt"
// DefaultMQTTScheme is the scheme used for MQTT URLs unless overridden
DefaultMQTTScheme = "mqtts"
)
Loading