Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Export config from content #21

Merged
merged 28 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions internal/app/agent/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
log "github.com/mainflux/mainflux/logger"
)
Expand Down Expand Up @@ -99,7 +98,7 @@ func (lm loggingMiddleware) ServiceConfig(uuid, cmdStr string) (err error) {
return lm.svc.ServiceConfig(uuid, cmdStr)
}

func (lm loggingMiddleware) Services() map[string]*services.Service {
func (lm loggingMiddleware) Services() []interface{} {
defer func(begin time.Time) {
message := fmt.Sprintf("Method services took %s to complete", time.Since(begin))
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
Expand Down
3 changes: 1 addition & 2 deletions internal/app/agent/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/go-kit/kit/metrics"
"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/app/agent/services"
"github.com/mainflux/agent/internal/pkg/config"
)

Expand Down Expand Up @@ -76,7 +75,7 @@ func (ms *metricsMiddleware) Config() config.Config {
return ms.svc.Config()
}

func (ms *metricsMiddleware) Services() map[string]*services.Service {
func (ms *metricsMiddleware) Services() []interface{} {
defer func(begin time.Time) {
ms.counter.With("method", "services").Add(1)
ms.latency.With("method", "services").Observe(time.Since(begin).Seconds())
Expand Down
122 changes: 92 additions & 30 deletions internal/app/agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package agent

import (
"encoding/base64"
"encoding/json"
"fmt"
"os/exec"
"sort"
"strings"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/agent/internal/app/agent/services"
Expand All @@ -25,6 +28,9 @@ const (
Hearbeat = "heartbeat.*"
Commands = "commands"
Config = "config"

view = "view"
save = "save"
)

var (
Expand All @@ -42,6 +48,9 @@ var (

// errNatsSubscribing indicates problem with sub to topic for heartbeat
errNatsSubscribing = errors.New("failed to subscribe to heartbeat topic")

// errNoSuchService indicates service not supported
errNoSuchService = errors.New("no such service")
)

// Service specifies API for publishing messages and subscribing to topics.
Expand All @@ -62,7 +71,7 @@ type Service interface {
ServiceConfig(uuid, cmdStr string) error

// Services returns service list
Services() map[string]*services.Service
Services() []interface{}

// Publish message
Publish(string, string) error
Expand All @@ -76,7 +85,7 @@ type agent struct {
edgexClient edgex.Client
logger log.Logger
nats *nats.Conn
servs map[string]*services.Service
svcs map[string]*services.Service
}

// New returns agent service implementation.
Expand All @@ -87,7 +96,7 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log
config: cfg,
nats: nc,
logger: logger,
servs: make(map[string]*services.Service),
svcs: make(map[string]*services.Service),
}

_, err := ag.nats.Subscribe(Hearbeat, func(msg *nats.Msg) {
Expand All @@ -97,16 +106,16 @@ func New(mc paho.Client, cfg *config.Config, ec edgex.Client, nc *nats.Conn, log
ag.logger.Error(fmt.Sprintf("Failed: Subject has incorrect length %s" + sub))
return
}
servname := tok[1]
svcname := tok[1]
// Service name is extracted from the subtopic
// if there is multiple instances of the same service
// we will have to add another distinction
if _, ok := ag.servs[servname]; !ok {
serv := services.NewService(servname)
ag.servs[servname] = serv
ag.logger.Info(fmt.Sprintf("Services '%s' registered", servname))
if _, ok := ag.svcs[svcname]; !ok {
svc := services.NewService(svcname)
ag.svcs[svcname] = svc
ag.logger.Info(fmt.Sprintf("Services '%s' registered", svcname))
}
serv := ag.servs[servname]
serv := ag.svcs[svcname]
serv.Update()
})

Expand Down Expand Up @@ -168,41 +177,76 @@ func (a *agent) Control(uuid, cmdStr string) error {
return err
}

payload, err := encodeSenML(uuid, cmd, resp)
if err != nil {
return err
}

if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil {
return err
}

return nil
return a.processResponse(uuid, cmd, resp)
}

// Message for this command
// "[{"bn":"1:", "n":"config", "vs":"export, /configs/export/config.toml, config_file_content"}]"
// [{"bn":"1:", "n":"services", "vs":"view"}]
// [{"bn":"1:", "n":"config", "vs":"save, export, filename, filecontent"}]
// config_file_content is base64 encoded marshaled structure representing service conf
// Example of creation:
dborovcanin marked this conversation as resolved.
Show resolved Hide resolved
// b, _ := toml.Marshal(cfg)
// config_file_content := base64.StdEncoding.EncodeToString(b)
func (a *agent) ServiceConfig(uuid, cmdStr string) error {
cmdArgs := strings.Split(strings.Replace(cmdStr, " ", "", -1), ",")
if len(cmdArgs) < 3 {
if len(cmdArgs) < 1 {
return errInvalidCommand
}
resp := ""
cmd := cmdArgs[0]

switch cmd {
case view:
services, err := json.Marshal(a.Services())
if err != nil {
return err
}
resp = string(services)
case save:
if len(cmdArgs) < 4 {
return errInvalidCommand
}
service := cmdArgs[1]
fileName := cmdArgs[2]
fileCont := cmdArgs[3]
if err := a.saveConfig(service, fileName, fileCont); err != nil {
return err
}
}
return a.processResponse(uuid, cmd, resp)
}

service := cmdArgs[0]
fileName := cmdArgs[1]
fileCont, err := base64.StdEncoding.DecodeString(cmdArgs[2])
func (a *agent) processResponse(uuid, cmd, resp string) error {
payload, err := encodeSenML(uuid, cmd, resp)
if err != nil {
return err
}
c := &export.Config{}
if err := a.Publish(a.config.Agent.Channels.Control, string(payload)); err != nil {
return err
}
return nil
}

func (a *agent) saveConfig(service, fileName, fileCont string) error {
switch service {
case "export":
mteodor marked this conversation as resolved.
Show resolved Hide resolved
content, err := base64.StdEncoding.DecodeString(fileCont)
if err != nil {
return err
}
c := &export.Config{}
if err := c.ReadBytes([]byte(content)); err != nil {
return err
}
c.File = fileName
if err := c.Save(); err != nil {
return err
}

default:
return errNoSuchService
}

c.ReadBytes([]byte(fileCont))
c.File = fileName
c.Save()
return a.nats.Publish(fmt.Sprintf("%s.%s.%s", Commands, service, Config), []byte(""))
}

Expand All @@ -214,8 +258,26 @@ func (a *agent) Config() config.Config {
return *a.config
}

func (a *agent) Services() map[string]*services.Service {
return a.servs
func (a *agent) Services() []interface{} {
services := [](interface{}){}
keys := []string{}
for k := range a.svcs {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
service := struct {
Name string
LastSeen time.Time
Status string
}{
Name: a.svcs[key].Name,
LastSeen: a.svcs[key].LastSeen,
Status: a.svcs[key].Status,
}
services = append(services, service)
}
return services
}

func (a *agent) Publish(crtlChan, payload string) error {
Expand Down
8 changes: 3 additions & 5 deletions internal/app/agent/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ import (
"time"
)

type Status string

const (
timeout = 3
interval = 10000

Online Status = "online"
Offline Status = "offline"
Online = "online"
mteodor marked this conversation as resolved.
Show resolved Hide resolved
Offline = "offline"
)

type Service struct {
Name string
LastSeen time.Time
Status Status
Status string

counter int
done chan bool
Expand Down
16 changes: 11 additions & 5 deletions internal/pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/mainflux/agent/internal/app/agent"
"github.com/mainflux/agent/internal/pkg/config"
export "github.com/mainflux/export/pkg/config"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/things"
)
Expand All @@ -36,11 +37,12 @@ type deviceConfig struct {
}

type infraConfig struct {
LogLevel string `json:"log_level"`
HTTPPort string `json:"http_port"`
MqttURL string `json:"mqtt_url"`
EdgexURL string `json:"edgex_url"`
NatsURL string `json:"nats_url"`
LogLevel string `json:"log_level"`
mteodor marked this conversation as resolved.
Show resolved Hide resolved
HTTPPort string `json:"http_port"`
MqttURL string `json:"mqtt_url"`
EdgexURL string `json:"edgex_url"`
NatsURL string `json:"nats_url"`
ExportConfig export.Config `json:"export_config"`
}

// Bootstrap - Retrieve device config
Expand Down Expand Up @@ -80,6 +82,10 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error {
if err := json.Unmarshal([]byte(dc.Content), &ic); err != nil {
return err
}
econf := &ic.ExportConfig
if econf != nil {
econf.Save()
}

if len(dc.MainfluxChannels) < 2 {
return agent.ErrMalformedEntity
Expand Down
16 changes: 10 additions & 6 deletions internal/pkg/mqtt/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ import (
paho "github.com/eclipse/paho.mqtt.golang"
)

type cmdType string

const (
reqTopic = "req"
servTopic = "services"
commands = "commands"

control cmdType = "control"
exec cmdType = "exec"
config cmdType = "config"
control = "control"
exec = "exec"
config = "config"
service = "service"
)

var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages/services(/[^?]*)?(\?.*)?$`)
Expand Down Expand Up @@ -102,7 +101,7 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) {
return
}

cmdType := cmdType(sm.Records[0].Name)
cmdType := sm.Records[0].Name
cmdStr := *sm.Records[0].StringValue
uuid := strings.TrimSuffix(sm.Records[0].BaseName, ":")

Expand All @@ -122,6 +121,11 @@ func (b *broker) handleMsg(mc paho.Client, msg paho.Message) {
if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil {
b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err))
}
case service:
b.logger.Info(fmt.Sprintf("Execute command for uuid %s and command string %s", uuid, cmdStr))
if err := b.svc.ServiceConfig(uuid, cmdStr); err != nil {
b.logger.Warn(fmt.Sprintf("Execute operation failed: %s", err))
}
}

}