From 3bbc28db529d103b5d2aee776f8a60bf50e7d7da Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Wed, 29 Jan 2020 16:56:04 +0100 Subject: [PATCH 01/26] add view services via mqtt Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 79 ++++++++++++++++++++++++----------- internal/pkg/mqtt/sub.go | 1 + 2 files changed, 56 insertions(+), 24 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 3fad9dc8..91ff484e 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -5,6 +5,7 @@ package agent import ( "encoding/base64" + "encoding/json" "fmt" "os/exec" "strings" @@ -42,6 +43,9 @@ var ( // errNatsSubscribing indicates problem with sub to topic for heartbeat errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic") + + // errNoSuchService indicates service not supported + errNoSuchService = errors.New("no such service") ) // Service specifies API for publishing messages and subscribing to topics. @@ -141,15 +145,13 @@ func (a *agent) Execute(uuid, cmd string) (string, error) { return string(payload), nil } -func (a *agent) Control(uuid, cmdStr string) error { +func (a *agent) Control(uuid, cmdStr string) (err error) { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") if len(cmdArgs) < 2 { return errInvalidCommand } - var resp string - var err error - + resp := "" cmd := cmdArgs[0] switch cmd { case "edgex-operation": @@ -168,16 +170,7 @@ func (a *agent) Control(uuid, cmdStr string) error { return err } - payload, err := encodeSenML(uuid, cmd, resp) - if err != nil { - return err - } - - if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { - return err - } - - return nil + return a.processResponse(resp, uuid, cmd) } // Message for this command @@ -186,24 +179,62 @@ func (a *agent) Control(uuid, cmdStr string) error { // Example of creation: // b, _ := toml.Marshal(cfg) // config_file_content := base64.StdEncoding.EncodeToString(b) -func (a *agent) ServiceConfig(uuid, cmdStr string) error { +func (a *agent) ServiceConfig(uuid, cmdStr string) (err error) { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") - if len(cmdArgs) < 3 { + if len(cmdArgs) < 1 { return errInvalidCommand } + services := []byte{} + resp := "" + cmd := cmdArgs[0] - service := cmdArgs[0] - fileName := cmdArgs[1] - fileCont, err := base64.StdEncoding.DecodeString(cmdArgs[2]) + switch cmd { + case "view": + services, err = json.Marshal(a.Services()) + resp = string(services) + case "save": + if len(cmdArgs) < 4 { + return errInvalidCommand + } + service := cmdArgs[1] + fileName := cmdArgs[2] + fileCont := cmdArgs[3] + err = a.saveConfig(service, fileName, fileCont) + } if err != nil { return err } - c := &export.Config{} + return a.processResponse(resp, uuid, cmd) +} - c.ReadBytes([]byte(fileCont)) - c.File = fileName - c.Save() - return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) +func (a *agent) processResponse(resp, uuid, cmd string) (err error) { + payload, err := encodeSenML(uuid, cmd, resp) + if err != nil { + return err + } + if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { + return err + } + return nil +} + +func (a *agent) saveConfig(service, fileName, fileCont string) (err error) { + switch service { + case "export": + content := []byte{} + content, err = base64.StdEncoding.DecodeString(fileCont) + c := &export.Config{} + c.ReadBytes([]byte(content)) + c.File = fileName + err = c.Save() + default: + err = errNoSuchService + } + if err != nil { + return + } + err = a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) + return err } func (a *agent) AddConfig(c config.Config) error { diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 51c4afbb..f7fdd1a5 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -122,6 +122,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } + } } From 68c29b1012aa7d0992c6e109fe7795fa10b17383 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Wed, 29 Jan 2020 17:07:53 +0100 Subject: [PATCH 02/26] small fix Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 5 ++--- internal/pkg/mqtt/sub.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 91ff484e..787004a5 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -231,10 +231,9 @@ func (a *agent) saveConfig(service, fileName, fileCont string) (err error) { err = errNoSuchService } if err != nil { - return + return err } - err = a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) - return err + return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) } func (a *agent) AddConfig(c config.Config) error { diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index f7fdd1a5..51c4afbb 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -122,7 +122,6 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } - } } From d0f5d0e31efb659be92185144f174bf6ba3021e0 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 31 Jan 2020 11:30:59 +0100 Subject: [PATCH 03/26] minor changes Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 787004a5..bcc9c891 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -21,11 +21,16 @@ import ( "github.com/nats-io/go-nats" ) +type cmdType string + const ( Path = "./config.toml" Hearbeat = "heartbeat.*" Commands = "commands" Config = "config" + + view cmdType = "view" + save cmdType = "save" ) var ( @@ -186,13 +191,13 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) (err error) { } services := []byte{} resp := "" - cmd := cmdArgs[0] + cmd := cmdType(cmdArgs[0]) switch cmd { - case "view": + case view: services, err = json.Marshal(a.Services()) resp = string(services) - case "save": + case save: if len(cmdArgs) < 4 { return errInvalidCommand } @@ -204,7 +209,7 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) (err error) { if err != nil { return err } - return a.processResponse(resp, uuid, cmd) + return a.processResponse(resp, uuid, string(cmd)) } func (a *agent) processResponse(resp, uuid, cmd string) (err error) { From 59b99edb2c6fbdad213ef80cdf184cddba521a4c Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 31 Jan 2020 12:57:22 +0100 Subject: [PATCH 04/26] update comment Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index bcc9c891..aaa5e0fb 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -179,7 +179,8 @@ func (a *agent) Control(uuid, cmdStr string) (err error) { } // Message for this command -// "[{"bn":"1:", "n":"config", "vs":"export, /configs/export/config.toml, config_file_content"}]" +// [{"bn":"1:", "n":"config", "vs":"view"}] +// [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}] // config_file_content is base64 encoded marshaled structure representing service conf // Example of creation: // b, _ := toml.Marshal(cfg) From 5b8089869ad7813d84137a90b6cc65b913c186c9 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Mon, 3 Feb 2020 14:21:33 +0100 Subject: [PATCH 05/26] add different endpoint for services view Signed-off-by: Mirko Teodorovic --- internal/app/agent/api/logging.go | 2 +- internal/app/agent/api/metrics.go | 2 +- internal/app/agent/service.go | 12 ++++++++---- internal/pkg/mqtt/sub.go | 6 ++++++ 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index e552aab5..50da672b 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -99,7 +99,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) { return lm.svc.ServiceConfig(uuid, cmdStr) } -func (lm loggingMiddleware) Services() map[string]*services.Service { +func (lm loggingMiddleware) Services() []*services.Service { defer func(begin time.Time) { message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index f6414305..09d46e3d 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -76,7 +76,7 @@ func (ms *metricsMiddleware) Config() config.Config { return ms.svc.Config() } -func (ms *metricsMiddleware) Services() map[string]*services.Service { +func (ms *metricsMiddleware) Services() []*services.Service { defer func(begin time.Time) { ms.counter.With("method", "services").Add(1) ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index aaa5e0fb..18ba9ad6 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -71,7 +71,7 @@ type Service interface { ServiceConfig(uuid, cmdStr string) error // Services returns service list - Services() map[string]*services.Service + Services() []*services.Service // Publish message Publish(string, string) error @@ -179,7 +179,7 @@ func (a *agent) Control(uuid, cmdStr string) (err error) { } // Message for this command -// [{"bn":"1:", "n":"config", "vs":"view"}] +// [{"bn":"1:", "n":"services", "vs":"view"}] // [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}] // config_file_content is base64 encoded marshaled structure representing service conf // Example of creation: @@ -250,8 +250,12 @@ func (a *agent) Config() config.Config { return *a.config } -func (a *agent) Services() map[string]*services.Service { - return a.servs +func (a *agent) Services() []*services.Service { + services := [](*services.Service){} + for _, s := range a.servs { + services = append(services, s) + } + return services } func (a *agent) Publish(crtlChan, payload string) error { diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 51c4afbb..1e881a6d 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -27,6 +27,7 @@ const ( control cmdType = "control" exec cmdType = "exec" config cmdType = "config" + service cmdType = "service" ) var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`) @@ -122,6 +123,11 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } + case service: + b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { + b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + } } } From 8c79f70cbe1dde3aca71d90de7c5c397ef9112a6 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Mon, 3 Feb 2020 18:55:05 +0100 Subject: [PATCH 06/26] dont use pointers Signed-off-by: Mirko Teodorovic --- internal/app/agent/api/logging.go | 2 +- internal/app/agent/api/metrics.go | 2 +- internal/app/agent/service.go | 17 ++++++++++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index 50da672b..100b8e66 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -99,7 +99,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) { return lm.svc.ServiceConfig(uuid, cmdStr) } -func (lm loggingMiddleware) Services() []*services.Service { +func (lm loggingMiddleware) Services() []services.Service { defer func(begin time.Time) { message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index 09d46e3d..73a2cf0b 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -76,7 +76,7 @@ func (ms *metricsMiddleware) Config() config.Config { return ms.svc.Config() } -func (ms *metricsMiddleware) Services() []*services.Service { +func (ms *metricsMiddleware) Services() []services.Service { defer func(begin time.Time) { ms.counter.With("method", "services").Add(1) ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 18ba9ad6..2ba22b14 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "os/exec" + "sort" "strings" paho "github.com/eclipse/paho.mqtt.golang" @@ -71,7 +72,7 @@ type Service interface { ServiceConfig(uuid, cmdStr string) error // Services returns service list - Services() []*services.Service + Services() []services.Service // Publish message Publish(string, string) error @@ -250,10 +251,16 @@ func (a *agent) Config() config.Config { return *a.config } -func (a *agent) Services() []*services.Service { - services := [](*services.Service){} - for _, s := range a.servs { - services = append(services, s) +func (a *agent) Services() []services.Service { + services := [](services.Service){} + keys := []string{} + for k := range a.servs { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + service := a.servs[key] + services = append(services, *service) } return services } From d80ff32dce59fd0d799d709efea195ca758a77e6 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 11:51:50 +0100 Subject: [PATCH 07/26] fix comments Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 53 +++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 2ba22b14..ceb11c20 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -151,13 +151,15 @@ func (a *agent) Execute(uuid, cmd string) (string, error) { return string(payload), nil } -func (a *agent) Control(uuid, cmdStr string) (err error) { +func (a *agent) Control(uuid, cmdStr string) error { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") if len(cmdArgs) < 2 { return errInvalidCommand } - resp := "" + var resp string + var err error + cmd := cmdArgs[0] switch cmd { case "edgex-operation": @@ -176,7 +178,7 @@ func (a *agent) Control(uuid, cmdStr string) (err error) { return err } - return a.processResponse(resp, uuid, cmd) + return a.processResponse(uuid, cmd, resp) } // Message for this command @@ -186,18 +188,20 @@ func (a *agent) Control(uuid, cmdStr string) (err error) { // Example of creation: // b, _ := toml.Marshal(cfg) // config_file_content := base64.StdEncoding.EncodeToString(b) -func (a *agent) ServiceConfig(uuid, cmdStr string) (err error) { +func (a *agent) ServiceConfig(uuid, cmdStr string) error { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") if len(cmdArgs) < 1 { return errInvalidCommand } - services := []byte{} resp := "" cmd := cmdType(cmdArgs[0]) switch cmd { case view: - services, err = json.Marshal(a.Services()) + services, err := json.Marshal(a.Services()) + if err != nil { + return err + } resp = string(services) case save: if len(cmdArgs) < 4 { @@ -206,15 +210,15 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) (err error) { service := cmdArgs[1] fileName := cmdArgs[2] fileCont := cmdArgs[3] - err = a.saveConfig(service, fileName, fileCont) - } - if err != nil { - return err + err := a.saveConfig(service, fileName, fileCont) + if err != nil { + return err + } } - return a.processResponse(resp, uuid, string(cmd)) + return a.processResponse(uuid, string(cmd), resp) } -func (a *agent) processResponse(resp, uuid, cmd string) (err error) { +func (a *agent) processResponse(uuid, cmd, resp string) (err error) { payload, err := encodeSenML(uuid, cmd, resp) if err != nil { return err @@ -225,21 +229,28 @@ func (a *agent) processResponse(resp, uuid, cmd string) (err error) { return nil } -func (a *agent) saveConfig(service, fileName, fileCont string) (err error) { +func (a *agent) saveConfig(service, fileName, fileCont string) error { switch service { case "export": - content := []byte{} - content, err = base64.StdEncoding.DecodeString(fileCont) + var content []byte + content, err := base64.StdEncoding.DecodeString(fileCont) + if err != nil { + return err + } + c := &export.Config{} - c.ReadBytes([]byte(content)) + if err := c.ReadBytes([]byte(content)); err != nil { + return err + } c.File = fileName - err = c.Save() + if err := c.Save(); err != nil { + return err + } + default: - err = errNoSuchService - } - if err != nil { - return err + return errNoSuchService } + return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) } From f23785b0267262ddec8a1f213cbc5106cc0272d5 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 12:00:41 +0100 Subject: [PATCH 08/26] fix errror return Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index ceb11c20..2ae27a68 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -218,7 +218,7 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) error { return a.processResponse(uuid, string(cmd), resp) } -func (a *agent) processResponse(uuid, cmd, resp string) (err error) { +func (a *agent) processResponse(uuid, cmd, resp string) error { payload, err := encodeSenML(uuid, cmd, resp) if err != nil { return err From 0ee05f6b111cdf2cffffef056605805c19b5ce78 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 12:02:14 +0100 Subject: [PATCH 09/26] inline function Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 2ae27a68..e6079cfd 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -210,8 +210,7 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) error { service := cmdArgs[1] fileName := cmdArgs[2] fileCont := cmdArgs[3] - err := a.saveConfig(service, fileName, fileCont) - if err != nil { + if err := a.saveConfig(service, fileName, fileCont); err != nil { return err } } From d2db7f20a492bfb4760a56c99e3e9adbfbcf679c Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 14:15:04 +0100 Subject: [PATCH 10/26] add export config Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 2 -- internal/pkg/bootstrap/bootstrap.go | 13 ++++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index e6079cfd..d65ceecb 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -231,12 +231,10 @@ func (a *agent) processResponse(uuid, cmd, resp string) error { func (a *agent) saveConfig(service, fileName, fileCont string) error { switch service { case "export": - var content []byte content, err := base64.StdEncoding.DecodeString(fileCont) if err != nil { return err } - c := &export.Config{} if err := c.ReadBytes([]byte(content)); err != nil { return err diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 4841bb71..4f6cee05 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -14,6 +14,7 @@ import ( "github.com/mainflux/agent/internal/app/agent" "github.com/mainflux/agent/internal/pkg/config" + export "github.com/mainflux/export/pkg/config" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/things" ) @@ -36,11 +37,12 @@ type deviceConfig struct { } type infraConfig struct { - LogLevel string `json:"log_level"` - HTTPPort string `json:"http_port"` - MqttURL string `json:"mqtt_url"` - EdgexURL string `json:"edgex_url"` - NatsURL string `json:"nats_url"` + LogLevel string `json:"log_level"` + HTTPPort string `json:"http_port"` + MqttURL string `json:"mqtt_url"` + EdgexURL string `json:"edgex_url"` + NatsURL string `json:"nats_url"` + ExportConfig export.Config `json:"export_config" mapstructure:"export_config"` } // Bootstrap - Retrieve device config @@ -77,6 +79,7 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { cfg.ID, cfg.URL)) ic := infraConfig{} + fmt.Println(string(dc.Content)) if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil { return err } From 53fd7f47cfc388bca30cfb274c93944ca2eb7171 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 14:27:06 +0100 Subject: [PATCH 11/26] small changes Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index e6079cfd..e4f9996d 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -22,16 +22,14 @@ import ( "github.com/nats-io/go-nats" ) -type cmdType string - const ( Path = "./config.toml" Hearbeat = "heartbeat.*" Commands = "commands" Config = "config" - view cmdType = "view" - save cmdType = "save" + view = "view" + save = "save" ) var ( @@ -86,7 +84,7 @@ type agent struct { edgexClient edgex.Client logger log.Logger nats *nats.Conn - servs map[string]*services.Service + svcs map[string]*services.Service } // New returns agent service implementation. @@ -97,7 +95,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log config: cfg, nats: nc, logger: logger, - servs: make(map[string]*services.Service), + svcs: make(map[string]*services.Service), } _, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) { @@ -107,16 +105,16 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub)) return } - servname := tok[1] + svcname := tok[1] // Service name is extracted from the subtopic // if there is multiple instances of the same service // we will have to add another distinction - if _, ok := ag.servs[servname]; !ok { - serv := services.NewService(servname) - ag.servs[servname] = serv - ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname)) + if _, ok := ag.svcs[svcname]; !ok { + svc := services.NewService(svcname) + ag.svcs[svcname] = svc + ag.logger.Info(fmt.Sprintf("Services '%s' registered", svcname)) } - serv := ag.servs[servname] + serv := ag.svcs[svcname] serv.Update() }) @@ -194,7 +192,7 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) error { return errInvalidCommand } resp := "" - cmd := cmdType(cmdArgs[0]) + cmd := cmdArgs[0] switch cmd { case view: @@ -214,7 +212,7 @@ func (a *agent) ServiceConfig(uuid, cmdStr string) error { return err } } - return a.processResponse(uuid, string(cmd), resp) + return a.processResponse(uuid, cmd, resp) } func (a *agent) processResponse(uuid, cmd, resp string) error { @@ -231,7 +229,6 @@ func (a *agent) processResponse(uuid, cmd, resp string) error { func (a *agent) saveConfig(service, fileName, fileCont string) error { switch service { case "export": - var content []byte content, err := base64.StdEncoding.DecodeString(fileCont) if err != nil { return err @@ -264,12 +261,12 @@ func (a *agent) Config() config.Config { func (a *agent) Services() []services.Service { services := [](services.Service){} keys := []string{} - for k := range a.servs { + for k := range a.svcs { keys = append(keys, k) } sort.Strings(keys) for _, key := range keys { - service := a.servs[key] + service := a.svcs[key] services = append(services, *service) } return services From 19cd7fbe19671c5920ade704d72dea99abe16a42 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 14:44:32 +0100 Subject: [PATCH 12/26] remove cmdType type Signed-off-by: Mirko Teodorovic --- internal/pkg/mqtt/sub.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 1e881a6d..2c24ae79 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -17,17 +17,15 @@ import ( paho "github.com/eclipse/paho.mqtt.golang" ) -type cmdType string - const ( reqTopic = "req" servTopic = "services" commands = "commands" - control cmdType = "control" - exec cmdType = "exec" - config cmdType = "config" - service cmdType = "service" + control = "control" + exec = "exec" + config = "config" + service = "service" ) var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`) @@ -103,7 +101,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { return } - cmdType := cmdType(sm.Records[0].Name) + cmdType := (sm.Records[0].Name) cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") From d7b0201cfe02254fc2f0fdfbbd5e5961beff5a1b Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 16:18:27 +0100 Subject: [PATCH 13/26] remove pointers Signed-off-by: Mirko Teodorovic --- internal/app/agent/api/logging.go | 3 +-- internal/app/agent/api/metrics.go | 3 +-- internal/app/agent/service.go | 19 ++++++++++++++----- internal/app/agent/services/service.go | 8 +++----- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index 100b8e66..15d0486b 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -8,7 +8,6 @@ import ( "time" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" log "github.com/mainflux/mainflux/logger" ) @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) { return lm.svc.ServiceConfig(uuid, cmdStr) } -func (lm loggingMiddleware) Services() []services.Service { +func (lm loggingMiddleware) Services() []interface{} { defer func(begin time.Time) { message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index 73a2cf0b..4e59cc01 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/metrics" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" ) @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config { return ms.svc.Config() } -func (ms *metricsMiddleware) Services() []services.Service { +func (ms *metricsMiddleware) Services() []interface{} { defer func(begin time.Time) { ms.counter.With("method", "services").Add(1) ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index e4f9996d..2c30d23f 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -10,6 +10,7 @@ import ( "os/exec" "sort" "strings" + "time" paho "github.com/eclipse/paho.mqtt.golang" "github.com/mainflux/agent/internal/app/agent/services" @@ -70,7 +71,7 @@ type Service interface { ServiceConfig(uuid, cmdStr string) error // Services returns service list - Services() []services.Service + Services() []interface{} // Publish message Publish(string, string) error @@ -258,16 +259,24 @@ func (a *agent) Config() config.Config { return *a.config } -func (a *agent) Services() []services.Service { - services := [](services.Service){} +func (a *agent) Services() []interface{} { + services := [](interface{}){} keys := []string{} for k := range a.svcs { keys = append(keys, k) } sort.Strings(keys) for _, key := range keys { - service := a.svcs[key] - services = append(services, *service) + service := struct { + Name string + LastSeen time.Time + Status string + }{ + Name: a.svcs[key].Name, + LastSeen: a.svcs[key].LastSeen, + Status: a.svcs[key].Status, + } + services = append(services, service) } return services } diff --git a/internal/app/agent/services/service.go b/internal/app/agent/services/service.go index 8d665342..5ad6a786 100644 --- a/internal/app/agent/services/service.go +++ b/internal/app/agent/services/service.go @@ -5,20 +5,18 @@ import ( "time" ) -type Status string - const ( timeout = 3 interval = 10000 - Online Status = "online" - Offline Status = "offline" + Online = "online" + Offline = "offline" ) type Service struct { Name string LastSeen time.Time - Status Status + Status string counter int done chan bool From 3d13c2d8abcab1c2889cd2c7b2d305eb6f5ffc5e Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 17:06:38 +0100 Subject: [PATCH 14/26] add support for writing export config Signed-off-by: Mirko Teodorovic --- internal/pkg/bootstrap/bootstrap.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 4f6cee05..b23cd706 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -42,7 +42,7 @@ type infraConfig struct { MqttURL string `json:"mqtt_url"` EdgexURL string `json:"edgex_url"` NatsURL string `json:"nats_url"` - ExportConfig export.Config `json:"export_config" mapstructure:"export_config"` + ExportConfig export.Config `json:"export_config"` } // Bootstrap - Retrieve device config @@ -83,6 +83,10 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil { return err } + econf := &ic.ExportConfig + if econf != nil { + econf.Save() + } if len(dc.MainfluxChannels) < 2 { return agent.ErrMalformedEntity From 95b45009a738f8f3dd98790570dea083336f5481 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 17:21:29 +0100 Subject: [PATCH 15/26] remove parenthesis Signed-off-by: Mirko Teodorovic --- internal/pkg/mqtt/sub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 2c24ae79..819def3d 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -101,7 +101,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { return } - cmdType := (sm.Records[0].Name) + cmdType := sm.Records[0].Name cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") From 167e7888c3b808dd2ef81c15e6776e11b1730bfa Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 17:38:48 +0100 Subject: [PATCH 16/26] remove println Signed-off-by: Mirko Teodorovic --- internal/pkg/bootstrap/bootstrap.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index b23cd706..48b11552 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -79,7 +79,6 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { cfg.ID, cfg.URL)) ic := infraConfig{} - fmt.Println(string(dc.Content)) if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil { return err } From 22484cb1dec4bd45c9ae8bd894d921013bd61719 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 4 Feb 2020 22:31:26 +0100 Subject: [PATCH 17/26] NOISSUE - add view services via mqtt (#19) * add view services via mqtt Signed-off-by: Mirko Teodorovic * small fix Signed-off-by: Mirko Teodorovic * minor changes Signed-off-by: Mirko Teodorovic * update comment Signed-off-by: Mirko Teodorovic * add different endpoint for services view Signed-off-by: Mirko Teodorovic * dont use pointers Signed-off-by: Mirko Teodorovic * fix comments Signed-off-by: Mirko Teodorovic * fix errror return Signed-off-by: Mirko Teodorovic * inline function Signed-off-by: Mirko Teodorovic * small changes Signed-off-by: Mirko Teodorovic * remove cmdType type Signed-off-by: Mirko Teodorovic * remove pointers Signed-off-by: Mirko Teodorovic * remove parenthesis Signed-off-by: Mirko Teodorovic * add service info type Signed-off-by: Mirko Teodorovic --- internal/app/agent/api/logging.go | 3 +- internal/app/agent/api/metrics.go | 3 +- internal/app/agent/service.go | 125 +++++++++++++++++++------ internal/app/agent/services/service.go | 8 +- internal/pkg/mqtt/sub.go | 16 ++-- 5 files changed, 110 insertions(+), 45 deletions(-) diff --git a/internal/app/agent/api/logging.go b/internal/app/agent/api/logging.go index e552aab5..60547798 100644 --- a/internal/app/agent/api/logging.go +++ b/internal/app/agent/api/logging.go @@ -8,7 +8,6 @@ import ( "time" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" log "github.com/mainflux/mainflux/logger" ) @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) { return lm.svc.ServiceConfig(uuid, cmdStr) } -func (lm loggingMiddleware) Services() map[string]*services.Service { +func (lm loggingMiddleware) Services() []agent.ServiceInfo { defer func(begin time.Time) { message := fmt.Sprintf("Method services took %s to complete", time.Since(begin)) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) diff --git a/internal/app/agent/api/metrics.go b/internal/app/agent/api/metrics.go index f6414305..9a361feb 100644 --- a/internal/app/agent/api/metrics.go +++ b/internal/app/agent/api/metrics.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/metrics" "github.com/mainflux/agent/internal/app/agent" - "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" ) @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config { return ms.svc.Config() } -func (ms *metricsMiddleware) Services() map[string]*services.Service { +func (ms *metricsMiddleware) Services() []agent.ServiceInfo { defer func(begin time.Time) { ms.counter.With("method", "services").Add(1) ms.latency.With("method", "services").Observe(time.Since(begin).Seconds()) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 3fad9dc8..50b6c762 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -5,9 +5,12 @@ package agent import ( "encoding/base64" + "encoding/json" "fmt" "os/exec" + "sort" "strings" + "time" paho "github.com/eclipse/paho.mqtt.golang" "github.com/mainflux/agent/internal/app/agent/services" @@ -25,6 +28,9 @@ const ( Hearbeat = "heartbeat.*" Commands = "commands" Config = "config" + + view = "view" + save = "save" ) var ( @@ -42,6 +48,9 @@ var ( // errNatsSubscribing indicates problem with sub to topic for heartbeat errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic") + + // errNoSuchService indicates service not supported + errNoSuchService = errors.New("no such service") ) // Service specifies API for publishing messages and subscribing to topics. @@ -62,7 +71,7 @@ type Service interface { ServiceConfig(uuid, cmdStr string) error // Services returns service list - Services() map[string]*services.Service + Services() []ServiceInfo // Publish message Publish(string, string) error @@ -70,13 +79,19 @@ type Service interface { var _ Service = (*agent)(nil) +type ServiceInfo struct { + Name string + LastSeen time.Time + Status string +} + type agent struct { mqttClient paho.Client config *config.Config edgexClient edgex.Client logger log.Logger nats *nats.Conn - servs map[string]*services.Service + svcs map[string]*services.Service } // New returns agent service implementation. @@ -87,7 +102,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log config: cfg, nats: nc, logger: logger, - servs: make(map[string]*services.Service), + svcs: make(map[string]*services.Service), } _, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) { @@ -97,16 +112,16 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub)) return } - servname := tok[1] + svcname := tok[1] // Service name is extracted from the subtopic // if there is multiple instances of the same service // we will have to add another distinction - if _, ok := ag.servs[servname]; !ok { - serv := services.NewService(servname) - ag.servs[servname] = serv - ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname)) + if _, ok := ag.svcs[svcname]; !ok { + svc := services.NewService(svcname) + ag.svcs[svcname] = svc + ag.logger.Info(fmt.Sprintf("Services '%s' registered", svcname)) } - serv := ag.servs[servname] + serv := ag.svcs[svcname] serv.Update() }) @@ -168,41 +183,77 @@ func (a *agent) Control(uuid, cmdStr string) error { return err } - payload, err := encodeSenML(uuid, cmd, resp) - if err != nil { - return err - } - - if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { - return err - } - - return nil + return a.processResponse(uuid, cmd, resp) } // Message for this command -// "[{"bn":"1:", "n":"config", "vs":"export, /configs/export/config.toml, config_file_content"}]" +// [{"bn":"1:", "n":"services", "vs":"view"}] +// [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}] // config_file_content is base64 encoded marshaled structure representing service conf // Example of creation: // b, _ := toml.Marshal(cfg) // config_file_content := base64.StdEncoding.EncodeToString(b) func (a *agent) ServiceConfig(uuid, cmdStr string) error { cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",") - if len(cmdArgs) < 3 { + if len(cmdArgs) < 1 { return errInvalidCommand } + resp := "" + cmd := cmdArgs[0] - service := cmdArgs[0] - fileName := cmdArgs[1] - fileCont, err := base64.StdEncoding.DecodeString(cmdArgs[2]) + switch cmd { + case view: + services, err := json.Marshal(a.Services()) + if err != nil { + return err + } + resp = string(services) + case save: + if len(cmdArgs) < 4 { + return errInvalidCommand + } + service := cmdArgs[1] + fileName := cmdArgs[2] + fileCont := cmdArgs[3] + if err := a.saveConfig(service, fileName, fileCont); err != nil { + return err + } + } + return a.processResponse(uuid, cmd, resp) +} + +func (a *agent) processResponse(uuid, cmd, resp string) error { + payload, err := encodeSenML(uuid, cmd, resp) if err != nil { return err } - c := &export.Config{} + if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil { + return err + } + return nil +} + +func (a *agent) saveConfig(service, fileName, fileCont string) error { + switch service { + case "export": + content, err := base64.StdEncoding.DecodeString(fileCont) + if err != nil { + return err + } + + c := &export.Config{} + if err := c.ReadBytes([]byte(content)); err != nil { + return err + } + c.File = fileName + if err := c.Save(); err != nil { + return err + } + + default: + return errNoSuchService + } - c.ReadBytes([]byte(fileCont)) - c.File = fileName - c.Save() return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte("")) } @@ -214,8 +265,22 @@ func (a *agent) Config() config.Config { return *a.config } -func (a *agent) Services() map[string]*services.Service { - return a.servs +func (a *agent) Services() []ServiceInfo { + services := []ServiceInfo{} + keys := []string{} + for k := range a.svcs { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + service := ServiceInfo{ + Name: a.svcs[key].Name, + LastSeen: a.svcs[key].LastSeen, + Status: a.svcs[key].Status, + } + services = append(services, service) + } + return services } func (a *agent) Publish(crtlChan, payload string) error { diff --git a/internal/app/agent/services/service.go b/internal/app/agent/services/service.go index 8d665342..5ad6a786 100644 --- a/internal/app/agent/services/service.go +++ b/internal/app/agent/services/service.go @@ -5,20 +5,18 @@ import ( "time" ) -type Status string - const ( timeout = 3 interval = 10000 - Online Status = "online" - Offline Status = "offline" + Online = "online" + Offline = "offline" ) type Service struct { Name string LastSeen time.Time - Status Status + Status string counter int done chan bool diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 51c4afbb..819def3d 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -17,16 +17,15 @@ import ( paho "github.com/eclipse/paho.mqtt.golang" ) -type cmdType string - const ( reqTopic = "req" servTopic = "services" commands = "commands" - control cmdType = "control" - exec cmdType = "exec" - config cmdType = "config" + control = "control" + exec = "exec" + config = "config" + service = "service" ) var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`) @@ -102,7 +101,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { return } - cmdType := cmdType(sm.Records[0].Name) + cmdType := sm.Records[0].Name cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") @@ -122,6 +121,11 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } + case service: + b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { + b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + } } } From 31f2a26d01f1d9477a475f931186e3f5fedd34e3 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 6 Feb 2020 15:15:33 +0100 Subject: [PATCH 18/26] add default file Signed-off-by: Mirko Teodorovic --- internal/pkg/bootstrap/bootstrap.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 48b11552..82193b3b 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -45,6 +45,8 @@ type infraConfig struct { ExportConfig export.Config `json:"export_config"` } +const exportConfigFile = "export.toml" + // Bootstrap - Retrieve device config func Bootstrap(cfg Config, logger log.Logger, file string) error { retries, err := strconv.ParseUint(cfg.Retries, 10, 64) @@ -84,7 +86,13 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { } econf := &ic.ExportConfig if econf != nil { - econf.Save() + if econf.File == "" { + econf.File = exportConfigFile + } + logger.Info(fmt.Sprintf("Saving export config file %s", econf.File)) + if err := econf.Save(); err != nil { + logger.Error(fmt.Sprintf("Failed to save export config file %s", err)) + } } if len(dc.MainfluxChannels) < 2 { From 6db736d166435d8a03c656a72dda3d462709def8 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 6 Feb 2020 16:56:43 +0100 Subject: [PATCH 19/26] add checking err Signed-off-by: Mirko Teodorovic --- internal/pkg/mqtt/sub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/mqtt/sub.go index 819def3d..68e241ef 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/mqtt/sub.go @@ -117,12 +117,12 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } case config: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("Config service for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } case service: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("List services for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } From c8bcea34c4ede0f2bc0c3fc293545ded811b9f57 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 7 Feb 2020 14:15:37 +0100 Subject: [PATCH 20/26] Enable mtls communication with mqtt broker (#15) * enable mtls Signed-off-by: Mirko Teodorovic * enable mtls Signed-off-by: Mirko Teodorovic * fix config endpoint Signed-off-by: Mirko Teodorovic * fix double broker connection Signed-off-by: Mirko Teodorovic * fix disconnect problem Signed-off-by: Mirko Teodorovic * killing the white lines Signed-off-by: Mirko Teodorovic * small corrections Signed-off-by: Mirko Teodorovic * remove some types and typos Signed-off-by: Mirko Teodorovic * fix logger usage Signed-off-by: Mirko Teodorovic * mqtt is moved to conn Signed-off-by: Mirko Teodorovic * resolve comments Signed-off-by: Mirko Teodorovic * resolve comments Signed-off-by: Mirko Teodorovic * kill whit line Signed-off-by: Mirko Teodorovic --- cmd/main.go | 170 ++++++++++++++++----- go.sum | 1 + internal/app/agent/api/common.go | 17 ++- internal/app/agent/api/endpoints.go | 36 +---- internal/app/agent/api/requests.go | 4 +- internal/app/agent/api/responses.go | 5 - internal/pkg/bootstrap/bootstrap.go | 12 +- internal/pkg/config/config.go | 23 +-- internal/pkg/{mqtt/sub.go => conn/conn.go} | 20 +-- 9 files changed, 181 insertions(+), 107 deletions(-) rename internal/pkg/{mqtt/sub.go => conn/conn.go} (83%) diff --git a/cmd/main.go b/cmd/main.go index 2864c472..e035f476 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,20 +1,23 @@ package main import ( + "crypto/tls" + "crypto/x509" "fmt" + "io/ioutil" "log" "net/http" "os" "os/signal" "syscall" - paho "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/agent/internal/app/agent" "github.com/mainflux/agent/internal/app/agent/api" "github.com/mainflux/agent/internal/pkg/bootstrap" "github.com/mainflux/agent/internal/pkg/config" - "github.com/mainflux/agent/internal/pkg/mqtt" + "github.com/mainflux/agent/internal/pkg/conn" "github.com/mainflux/agent/pkg/edgex" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" @@ -32,11 +35,19 @@ const ( defLogLevel = "info" defEdgexURL = "http://localhost:48090/api/v1/" defMqttURL = "localhost:1883" - defThingID = "2dce1d65-73b4-4020-bfe3-403d851386e7" - defThingKey = "1ff0d0f0-ea04-4fbb-83c4-c10b110bf566" defCtrlChan = "f36c3733-95a3-481c-a314-4125e03d8993" defDataChan = "ea353dac-0298-4fbb-9e5d-501e3699949c" defEncryption = "false" + defMqttUsername = "" + defMqttPassword = "" + defMqttChannel = "" + defMqttSkipTLSVer = "true" + defMqttMTLS = "false" + defMqttCA = "ca.crt" + defMqttQoS = "0" + defMqttRetain = false + defMqttCert = "thing.cert" + defMqttPrivKey = "thing.key" defConfigFile = "config.toml" defNatsURL = nats.DefaultURL @@ -56,6 +67,16 @@ const ( envDataChan = "MF_AGENT_DATA_CHANNEL" envEncryption = "MF_AGENT_ENCRYPTION" envNatsURL = "MF_AGENT_NATS_URL" + + envMqttUsername = "MF_AGENT_MQTT_USERNAME" + envMqttPassword = "MF_AGENT_MQTT_PASSWORD" + envMqttSkipTLSVer = "MF_AGENT_MQTT_SKIP_TLS" + envMqttMTLS = "MF_AGENT_MQTT_MTLS" + envMqttCA = "MF_AGENT_MQTT_CA" + envMqttQoS = "MF_AGENT_MQTT_QOS" + envMqttRetain = "MF_AGENT_MQTT_RETAIN" + envMqttCert = "MF_AGENT_MQTT_CLIENT_CERT" + envMqttPrivKey = "MF_AGENT_MQTT_CLIENT_PK" ) func main() { @@ -66,21 +87,28 @@ func main() { cfg, err := loadConfig(logger) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to load config: %s", err.Error())) + logger.Error(fmt.Sprintf("Failed to load config: %s", err.Error())) + os.Exit(1) } nc, err := nats.Connect(cfg.Agent.Server.NatsURL) if err != nil { - log.Fatalf(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Agent.Server.NatsURL)) + os.Exit(1) } defer nc.Close() - mqttClient := connectToMQTTBroker(cfg.Agent.MQTT.URL, cfg.Agent.Thing.ID, cfg.Agent.Thing.Key, logger) + mqttClient, err := connectToMQTTBroker(cfg.Agent.MQTT, logger) + if err != nil { + logger.Error(err.Error()) + os.Exit(1) + } edgexClient := edgex.NewClient(cfg.Agent.Edgex.URL, logger) svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger) if err != nil { - log.Fatalf(fmt.Sprintf("Error in agent service: %s", err.Error())) + logger.Error(fmt.Sprintf("Error in agent service: %s", err.Error())) + os.Exit(1) } svc = api.LoggingMiddleware(svc, logger) @@ -138,56 +166,85 @@ func loadConfig(logger logger.Logger) (config.Config, error) { NatsURL: mainflux.Env(envNatsURL, defNatsURL), Port: mainflux.Env(envLogLevel, defLogLevel), } - tc := config.ThingConf{ - ID: mainflux.Env(envThingID, defThingID), - Key: mainflux.Env(envThingKey, defThingKey), - } cc := config.ChanConf{ Control: mainflux.Env(envCtrlChan, defCtrlChan), Data: mainflux.Env(envDataChan, defDataChan), } ec := config.EdgexConf{URL: mainflux.Env(envEdgexURL, defEdgexURL)} lc := config.LogConf{Level: mainflux.Env(envLogLevel, defLogLevel)} - mc := config.MQTTConf{URL: mainflux.Env(envMqttURL, defMqttURL)} - - c := config.New(sc, tc, cc, ec, lc, mc, file) + mc := config.MQTTConf{ + URL: mainflux.Env(envMqttURL, defMqttURL), + Username: mainflux.Env(envMqttUsername, defMqttUsername), + Password: mainflux.Env(envMqttPassword, defMqttPassword), + } + c := config.New(sc, cc, ec, lc, mc, file) if err := c.Read(); err != nil { logger.Error(fmt.Sprintf("Failed to read config: %s", err)) return config.Config{}, err } + mc, err := loadCertificate(c.Agent.MQTT) + if err != nil { + logger.Error(fmt.Sprintf("Failed to set up mtls certs %s", err)) + } + c.Agent.MQTT = mc + return *c, nil } -func connectToMQTTBroker(mqttURL, thingID, thingKey string, logger logger.Logger) paho.Client { - clientID := fmt.Sprintf("agent-%s", thingID) - opts := paho.NewClientOptions() - opts.AddBroker(mqttURL) - opts.SetClientID(clientID) - opts.SetUsername(thingID) - opts.SetPassword(thingKey) - opts.SetCleanSession(true) - opts.SetAutoReconnect(true) - opts.SetOnConnectHandler(func(c paho.Client) { - logger.Info("Connected to MQTT broker") - }) - opts.SetConnectionLostHandler(func(c paho.Client, err error) { - logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error())) - os.Exit(1) - }) +func connectToMQTTBroker(conf config.MQTTConf, logger logger.Logger) (mqtt.Client, error) { + name := fmt.Sprintf("agent-%s", conf.Username) + conn := func(client mqtt.Client) { + logger.Info(fmt.Sprintf("Client %s connected", name)) + } - client := paho.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - logger.Error(fmt.Sprintf("Failed to connect to MQTT broker: %s", token.Error())) - os.Exit(1) + lost := func(client mqtt.Client, err error) { + logger.Info(fmt.Sprintf("Client %s disconnected", name)) + } + + opts := mqtt.NewClientOptions(). + AddBroker(conf.URL). + SetClientID(name). + SetCleanSession(true). + SetAutoReconnect(true). + SetOnConnectHandler(conn). + SetConnectionLostHandler(lost) + + if conf.Username != "" && conf.Password != "" { + opts.SetUsername(conf.Username) + opts.SetPassword(conf.Password) } - return client + if conf.MTLS { + cfg := &tls.Config{ + InsecureSkipVerify: conf.SkipTLSVer, + } + + if conf.CA != nil { + cfg.RootCAs = x509.NewCertPool() + cfg.RootCAs.AppendCertsFromPEM(conf.CA) + } + if conf.Cert.Certificate != nil { + cfg.Certificates = []tls.Certificate{conf.Cert} + } + + cfg.BuildNameToCertificate() + opts.SetTLSConfig(cfg) + opts.SetProtocolVersion(4) + } + client := mqtt.NewClient(opts) + token := client.Connect() + token.Wait() + + if token.Error() != nil { + return nil, token.Error() + } + return client, nil } -func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { - broker := mqtt.NewBroker(svc, mc, nc, logger) +func subscribeToMQTTBroker(svc agent.Service, mc mqtt.Client, ctrlChan string, nc *nats.Conn, logger logger.Logger) { + broker := conn.NewBroker(svc, mc, nc, logger) topic := fmt.Sprintf("channels/%s/messages", ctrlChan) if err := broker.Subscribe(topic); err != nil { logger.Error(fmt.Sprintf("Failed to subscribe to MQTT broker: %s", err.Error())) @@ -195,3 +252,40 @@ func subscribeToMQTTBroker(svc agent.Service, mc paho.Client, ctrlChan string, n } logger.Info("Subscribed to MQTT broker") } + +func loadCertificate(cfg config.MQTTConf) (config.MQTTConf, error) { + c := cfg + caByte := []byte{} + cert := tls.Certificate{} + if !cfg.MTLS { + return c, nil + } + caFile, err := os.Open(cfg.CAPath) + defer caFile.Close() + if err != nil { + return c, err + } + caByte, err = ioutil.ReadAll(caFile) + if err != nil { + return c, err + } + clientCert, err := os.Open(cfg.CertPath) + defer clientCert.Close() + if err != nil { + return c, err + } + cc, _ := ioutil.ReadAll(clientCert) + privKey, err := os.Open(cfg.PrivKeyPath) + defer clientCert.Close() + if err != nil { + return c, err + } + pk, _ := ioutil.ReadAll((privKey)) + cert, err = tls.X509KeyPair([]byte(cc), []byte(pk)) + if err != nil { + return c, err + } + cfg.Cert = cert + cfg.CA = caByte + return c, nil +} diff --git a/go.sum b/go.sum index b35f7204..1ad12d15 100644 --- a/go.sum +++ b/go.sum @@ -250,6 +250,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/internal/app/agent/api/common.go b/internal/app/agent/api/common.go index 914deebf..c56c1720 100644 --- a/internal/app/agent/api/common.go +++ b/internal/app/agent/api/common.go @@ -7,11 +7,6 @@ type serverConf struct { port string `json:"port"` } -type thingConf struct { - id string `json:"id"` - key string `json:"key"` -} - type chanConf struct { control string `json:"control"` data string `json:"data"` @@ -26,13 +21,21 @@ type logConf struct { } type mqttConf struct { - url string `json:"url"` + url string `json:"url"` + username string `json:"username"` + password string `json:"json"` + mtls bool `json:"mtls"` + skipTLSVer bool `json:"skip_tls_ver"` + retain bool `json:"retain"` + QoS int `json:"qos"` + caPath string `json:"ca_path"` + certPath string `json:"cert_path"` + privKeyPath string `json:"priv_key_path"` } // Config struct of Mainflux Agent type agentConf struct { server serverConf `json:"server"` - thing thingConf `json:"thing"` channels chanConf `json:"channels"` edgex edgexConf `json:"edgex"` log logConf `json:"log"` diff --git a/internal/app/agent/api/endpoints.go b/internal/app/agent/api/endpoints.go index 95bdc936..968aed17 100644 --- a/internal/app/agent/api/endpoints.go +++ b/internal/app/agent/api/endpoints.go @@ -70,16 +70,15 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { Control: req.agent.channels.control, Data: req.agent.channels.data, } - tc := config.ThingConf{ - ID: req.agent.thing.id, - Key: req.agent.thing.key, - } ec := config.EdgexConf{URL: req.agent.edgex.url} lc := config.LogConf{Level: req.agent.log.level} - mc := config.MQTTConf{URL: req.agent.mqtt.url} + mc := config.MQTTConf{ + URL: req.agent.mqtt.url, + Username: req.agent.mqtt.username, + Password: req.agent.mqtt.password, + } a := config.AgentConf{ Server: sc, - Thing: tc, Channels: cc, Edgex: ec, Log: lc, @@ -100,30 +99,7 @@ func addConfigEndpoint(svc agent.Service) endpoint.Endpoint { func viewConfigEndpoint(svc agent.Service) endpoint.Endpoint { return func(_ context.Context, request interface{}) (interface{}, error) { c := svc.Config() - - sc := serverConf{port: c.Agent.Server.Port} - cc := chanConf{ - control: c.Agent.Channels.Control, - data: c.Agent.Channels.Data, - } - tc := thingConf{ - id: c.Agent.Thing.ID, - key: c.Agent.Thing.Key, - } - ec := edgexConf{url: c.Agent.Edgex.URL} - lc := logConf{level: c.Agent.Log.Level} - mc := mqttConf{url: c.Agent.MQTT.URL} - a := agentConf{ - server: sc, - thing: tc, - channels: cc, - edgex: ec, - log: lc, - mqtt: mc, - } - res := configRes{agent: a} - - return res, nil + return c, nil } } diff --git a/internal/app/agent/api/requests.go b/internal/app/agent/api/requests.go index fa2c4daa..81e5f2d6 100644 --- a/internal/app/agent/api/requests.go +++ b/internal/app/agent/api/requests.go @@ -41,8 +41,8 @@ type addConfigReq struct { func (req addConfigReq) validate() error { if req.agent.server.port == "" || - req.agent.thing.id == "" || - req.agent.thing.key == "" || + req.agent.mqtt.username == "" || + req.agent.mqtt.password == "" || req.agent.channels.control == "" || req.agent.channels.data == "" || req.agent.log.level == "" || diff --git a/internal/app/agent/api/responses.go b/internal/app/agent/api/responses.go index d6146907..6bf4ae41 100644 --- a/internal/app/agent/api/responses.go +++ b/internal/app/agent/api/responses.go @@ -13,8 +13,3 @@ type execRes struct { Name string `json:"n"` Value string `json:"vs"` } - -type configRes struct { - agent agentConf - file string -} diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 4841bb71..cef64940 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -97,19 +97,19 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error { NatsURL: ic.NatsURL, } - tc := config.ThingConf{ - ID: dc.MainfluxID, - Key: dc.MainfluxKey, - } cc := config.ChanConf{ Control: ctrlChan, Data: dataChan, } ec := config.EdgexConf{URL: ic.EdgexURL} lc := config.LogConf{Level: ic.LogLevel} - mc := config.MQTTConf{URL: ic.MqttURL} + mc := config.MQTTConf{ + URL: ic.MqttURL, + Password: dc.MainfluxKey, + Username: dc.MainfluxID, + } - c := config.New(sc, tc, cc, ec, lc, mc, file) + c := config.New(sc, cc, ec, lc, mc, file) return c.Save() } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 562c73bd..9da78c65 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -4,6 +4,7 @@ package config import ( + "crypto/tls" "fmt" "io/ioutil" @@ -15,11 +16,6 @@ type ServerConf struct { NatsURL string `toml:"nats_url"` } -type ThingConf struct { - ID string `toml:"id"` - Key string `toml:"key"` -} - type ChanConf struct { Control string `toml:"control"` Data string `toml:"data"` @@ -34,13 +30,23 @@ type LogConf struct { } type MQTTConf struct { - URL string `toml:"url"` + URL string `json:"url" toml:"url"` + Username string `json:"username" toml:"username" mapstructure:"username"` + Password string `json:"password" toml:"password" mapstructure:"password"` + MTLS bool `json:"mtls" toml:"mtls" mapstructure:"mtls"` + SkipTLSVer bool `json:"skip_tls_ver" toml:"skip_tls_ver" mapstructure:"skip_tls_ver"` + Retain bool `json:"retain" toml:"retain" mapstructure:"retain"` + QoS int `json:"qos" toml:"qos" mapstructure:"qos"` + CAPath string `json:"ca_path" toml:"ca_path" mapstructure:"ca_path"` + CertPath string `json:"cert_path" toml:"cert_path" mapstructure:"cert_path"` + PrivKeyPath string `json:"priv_key_path" toml:"priv_key_path" mapstructure:"priv_key_path"` + CA []byte `json:"-" toml:"-"` + Cert tls.Certificate `json:"-" toml:"-"` } // Config struct of Mainflux Agent type AgentConf struct { Server ServerConf `toml:"server"` - Thing ThingConf `toml:"thing"` Channels ChanConf `toml:"channels"` Edgex EdgexConf `toml:"edgex"` Log LogConf `toml:"log"` @@ -52,10 +58,9 @@ type Config struct { File string } -func New(sc ServerConf, tc ThingConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) *Config { +func New(sc ServerConf, cc ChanConf, ec EdgexConf, lc LogConf, mc MQTTConf, file string) *Config { ac := AgentConf{ Server: sc, - Thing: tc, Channels: cc, Edgex: ec, Log: lc, diff --git a/internal/pkg/mqtt/sub.go b/internal/pkg/conn/conn.go similarity index 83% rename from internal/pkg/mqtt/sub.go rename to internal/pkg/conn/conn.go index 819def3d..2450a2bf 100644 --- a/internal/pkg/mqtt/sub.go +++ b/internal/pkg/conn/conn.go @@ -1,7 +1,7 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -package mqtt +package conn import ( "fmt" @@ -14,7 +14,7 @@ import ( "github.com/nats-io/go-nats" "robpike.io/filter" - paho "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( @@ -40,13 +40,13 @@ type MqttBroker interface { type broker struct { svc agent.Service - client paho.Client + client mqtt.Client logger logger.Logger nats *nats.Conn } // NewBroker returns new MQTT broker instance. -func NewBroker(svc agent.Service, client paho.Client, nats *nats.Conn, log logger.Logger) MqttBroker { +func NewBroker(svc agent.Service, client mqtt.Client, nats *nats.Conn, log logger.Logger) MqttBroker { return &broker{ svc: svc, client: client, @@ -73,7 +73,7 @@ func (b *broker) Subscribe(topic string) error { } // handleNatsMsg triggered when new message is received on MQTT broker -func (b *broker) handleNatsMsg(mc paho.Client, msg paho.Message) { +func (b *broker) handleNatsMsg(mc mqtt.Client, msg mqtt.Message) { if topic := extractNatsTopic(msg.Topic()); topic != "" { b.nats.Publish(topic, msg.Payload()) } @@ -94,7 +94,7 @@ func extractNatsTopic(topic string) string { } // handleMsg triggered when new message is received on MQTT broker -func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { +func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { sm, err := senml.Decode(msg.Payload(), senml.JSON) if err != nil { b.logger.Warn(fmt.Sprintf("SenML decode failed: %s", err)) @@ -117,14 +117,14 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) { b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) } case config: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("Config service for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { - b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + b.logger.Warn(fmt.Sprintf("Config service operation failed: %s", err)) } case service: - b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr)) + b.logger.Info(fmt.Sprintf("Services view for uuid %s and command string %s", uuid, cmdStr)) if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil { - b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err)) + b.logger.Warn(fmt.Sprintf("Services view operation failed: %s", err)) } } From b7010e3f3e38118192d4151d29fa31936bd51c66 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 7 Feb 2020 16:02:20 +0100 Subject: [PATCH 21/26] update gitignore Signed-off-by: Mirko Teodorovic --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 6565ee3d..496eadc5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ build -cmd/config.toml \ No newline at end of file +cmd/config.toml +.vscode From 947fcaaba9eb1c7411f4de731ab63ae6f1874fa8 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 7 Feb 2020 16:40:29 +0100 Subject: [PATCH 22/26] dont export vars Signed-off-by: Mirko Teodorovic --- internal/app/agent/services/service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/app/agent/services/service.go b/internal/app/agent/services/service.go index 5ad6a786..c02f9fd4 100644 --- a/internal/app/agent/services/service.go +++ b/internal/app/agent/services/service.go @@ -9,8 +9,8 @@ const ( timeout = 3 interval = 10000 - Online = "online" - Offline = "offline" + online = "online" + offline = "offline" ) type Service struct { @@ -27,7 +27,7 @@ type Service struct { func NewService(name string) *Service { ticker := time.NewTicker(interval * time.Millisecond) done := make(chan bool) - s := Service{Name: name, Status: Online, done: done, counter: timeout, ticker: ticker} + s := Service{Name: name, Status: online, done: done, counter: timeout, ticker: ticker} s.Listen() return &s } @@ -42,7 +42,7 @@ func (s *Service) Listen() { s.mu.Lock() s.counter = s.counter - 1 if s.counter == 0 { - s.Status = Offline + s.Status = offline s.counter = timeout } s.mu.Unlock() @@ -56,5 +56,5 @@ func (s *Service) Update() { s.mu.Lock() defer s.mu.Unlock() s.counter = timeout - s.Status = Online + s.Status = online } From adfcc6b585afdb8fc98c3fe3f9fd2aed896a6738 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 9 Feb 2020 17:32:47 +0100 Subject: [PATCH 23/26] remove vscode Signed-off-by: Mirko Teodorovic --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 496eadc5..2b1c8246 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ build cmd/config.toml -.vscode From cafade7e1d12ab404c1ac994eb4a72856cf1290e Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 9 Feb 2020 22:07:44 +0100 Subject: [PATCH 24/26] add export const Signed-off-by: Mirko Teodorovic --- internal/app/agent/service.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/app/agent/service.go b/internal/app/agent/service.go index 0517666c..41710fe5 100644 --- a/internal/app/agent/service.go +++ b/internal/app/agent/service.go @@ -16,7 +16,7 @@ import ( "github.com/mainflux/agent/internal/app/agent/services" "github.com/mainflux/agent/internal/pkg/config" "github.com/mainflux/agent/pkg/edgex" - export "github.com/mainflux/export/pkg/config" + exp "github.com/mainflux/export/pkg/config" "github.com/mainflux/mainflux/errors" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/senml" @@ -31,6 +31,8 @@ const ( view = "view" save = "save" + + export = "export" ) var ( @@ -234,12 +236,12 @@ func (a *agent) processResponse(uuid, cmd, resp string) error { func (a *agent) saveConfig(service, fileName, fileCont string) error { switch service { - case "export": + case export: content, err := base64.StdEncoding.DecodeString(fileCont) if err != nil { return err } - c := &export.Config{} + c := &exp.Config{} if err := c.ReadBytes([]byte(content)); err != nil { return err } From 4169112b31756121dd7f93648f88d155c75fb717 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Mon, 10 Feb 2020 10:41:01 +0100 Subject: [PATCH 25/26] add len check for senml payload Signed-off-by: Mirko Teodorovic --- internal/pkg/conn/conn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/pkg/conn/conn.go b/internal/pkg/conn/conn.go index efc0ca13..1e33ea3b 100644 --- a/internal/pkg/conn/conn.go +++ b/internal/pkg/conn/conn.go @@ -101,6 +101,10 @@ func (b *broker) handleMsg(mc mqtt.Client, msg mqtt.Message) { return } + if len(sm.Records) == 0 { + b.logger.Error(fmt.Sprintf("SenML payload empty: `%s`", string(msg.Payload()))) + return + } cmdType := sm.Records[0].Name cmdStr := *sm.Records[0].StringValue uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":") From a73a81a1169b4f4f80dc490fe359b6bcbc1081dd Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Mon, 10 Feb 2020 10:43:19 +0100 Subject: [PATCH 26/26] change constant exportConfigFile Signed-off-by: Mirko Teodorovic --- internal/pkg/bootstrap/bootstrap.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/bootstrap/bootstrap.go b/internal/pkg/bootstrap/bootstrap.go index 0cfc4005..123bf5b6 100644 --- a/internal/pkg/bootstrap/bootstrap.go +++ b/internal/pkg/bootstrap/bootstrap.go @@ -19,6 +19,8 @@ import ( "github.com/mainflux/mainflux/things" ) +const exportConfigFile = "/configs/export/config.toml" + // Config represents the parameters for boostraping type Config struct { URL string @@ -45,8 +47,6 @@ type infraConfig struct { ExportConfig export.Config `json:"export_config"` } -const exportConfigFile = "export.toml" - // Bootstrap - Retrieve device config func Bootstrap(cfg Config, logger log.Logger, file string) error { retries, err := strconv.ParseUint(cfg.Retries, 10, 64)