This module contain the common code code for all MQTT listeners as well as SparkplugB™ related prop files, generated python files and helper classes. There are some preparation work required to setup the infrastructure and environment for MQTT as well as SparkPlugB which are defined in the sections below
SparkplugB Topic namespace follows the following structure
spBv1.0/<group_id>/<message_type>/<edge_node_id>/[<device_id>]
Where in:
- <group_id>: provides for a logical grouping of MQTT EoN nodes
- <message_type> provides an indication as to how to handle the MQTT payload of the message. The following message_type elements are defined for the SparkplugB™ Topic Namespace:
- NBIRTH: Birth certificate for MQTT EoN nodes.
- NDEATH: Death certificate for MQTT EoN nodes.
- DBIRTH: Birth certificate for Devices.
- DDEATH: Death certificate for Devices.
- NDATA: Node data message.
- DDATA: Device data message.
- NCMD: Node command message.
- DCMD: Device command message.
- STATE: Critical application state message Please refer to the detailed specification of these message types in the SparkplugB Specs
- <edge_node_id>: uniquely identifies the MQTT EoN node within the factory context.
- [<device_id>]: optional and identifies a device attached (physically or logically) to the MQTT EoN node
- Step 1: Download or install protoc. Refer
- Installing on Linux/MacOs
- Install pre-compiled version. This project currently is using version Protocol Buffers v26.1 and downloaded the pre-compiled versions for linux-x86_64 and win64. For other platforms please replace with the appropriate runtime or compile the runtime directly
- Step 2: Copy the SparkPlugB protocol buffer specification from Eclipse Tahu project to the folder ./sparkplug_b
- Step 3: Compile the SparkplugB protocol buffer into python class by the following command
# Execute on Linux ./protobuf/bin/protoc -I ./sparkplug_b/ --python_out=./src/uns_sparkplugb/generated --pyi_out=./src/uns_sparkplugb/generated ./sparkplug_b/sparkplug_b.proto
# Execute on windows .\protobuf\bin\protoc.exe -I .\sparkplug_b\ --python_out=.\src\uns_sparkplugb\generated --pyi_out=.\src\uns_sparkplugb\generated .\sparkplug_b\sparkplug_b.proto
The executables and the generated code are checked into the repository
- The
protoc
executable for Linux and Windows - The
sparkplug_b.proto
file - The generated python files from sparkplug_b.proto specification
The MQTT Cluster from EMQX is easily setup on a cluster. There are other ways like within a docker or directly via the executable, but I choose to use the K8s setup to be able to leverage the benefits of scaling up, failover and other orchestration benefits. Before proceeding ensure that you have setup your K8s Cluster as described in 01_k8scluster Also familiarize yourself with the K8s Storage class Mayastor
microk8s helm3 repo add emqx https://repos.emqx.io/charts
microk8s helm3 repo update
microk8s helm3 search repo emqx
# This command needs to be executed to have persistence available to the MQTT instances
# Select the storage class available to your cluster
microk8s kubectl apply -f - <<EOF
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: mayastor-2
parameters:
repl: '2'
protocol: 'nvmf'
ioTimeout: '60'
local: 'true'
provisioner: io.openebs.csi-mayastor
volumeBindingMode: WaitForFirstConsumer
EOF
Important Note: Validate that you have the correct persistance storage class We are using mayastor which was installed as an addon
# command to view the available storage classes. kubectl get sc
Using helm
install the MQTT Cluster on the edge. I choose to have each cluster in it's own namespace
# Install the Edge version of EMQX which has a smaller footprint and developed specifically for the edge
#microk8s helm3 install uns-emqx-edge emqx/emqx --namespace <FACTORY NAME> --set image.repository=emqx/emqx-edge --set service.type=LoadBalancer --create-namespace --wait
#e.g.
microk8s helm3 install uns-emqx-edge emqx/emqx --namespace factory1 --set image.repository=emqx/emqx-edge --set service.type=LoadBalancer --create-namespace --wait
# Upgrade installation with newer version #e.g.
microk8s helm3 repo update
microk8s helm3 upgrade --install uns-emqx-edge emqx/emqx --namespace factory1 --set image.repository=emqx/emqx-edge --set service.type=LoadBalancer --create-namespace --wait
Normally you would could to use the cloud service MQTT server. I choose to install a K8s cluster for the enterprise cluster of EMQX
# Install the central cluster EMQX brokers at the Corporate instance/ Cloud
microk8s helm3 install uns-emqx-corp emqx/emqx \
--namespace enterprise \
--set persistence.enabled=true \
--set persistence.size=100M \
--set persistence.storageClass=mayastor-2 \
--set service.type=LoadBalancer \
--create-namespace \
--wait
The guide for setting up the MQTT bride via the dashboard is provided here A simplified guide is given here
SELECT * from '#'
By using the payload template as ${payload} we ensure that the exact same message with no additional attributes is sent over the bridge otherwise EMQX adds a bunch of additional attributes and pushes the entire message to the attribute ´payload´.
Since we have deployed on K8s, concepts like SSL/TLS should ideally be handled at the Ingress Controller
For further securing options, like ACL, additional authentication methods etc. EMQX has provide quite some documentation, guides for securing the cluster.
Important Note: The installation exposes the dashboard with the standard user & credentials. Remember to update the default user as well as the password after the system is deployed
This function is executed by the following command with the current folder as 02_mqtt-cluster
This has been tested on Unix(bash), Windows(powershell) and Mac(zsh)
python -m pip install --upgrade pip
pip install poetry
# Ensure that the poetry shell is activated
poetry shell
python -m pip install --upgrade pip poetry
poetry install
While importing the folder into VSCode remember to do the following steps the first time
Open a terminal in VSCode
Activate the poetry shell.
poetry shell poetry installSelect the correct python interpreter in VSCode (should automatically detect the poetry virtual environment)
The set of test for this module is executed after the poetry setup is done
#run all tests excluding integration tests
poetry run pytest -m "not integrationtest" test/
# runs all tests
poetry run pytest test/
- Eclipse Sparkplug B Specification
- Cirrus Link Sparkplug B MQTT Tutorials
- Github Eclipse Tahu project
- Google Protocol Buffers Project
-
MQTTv3.1 appears not to be supported by EMQX. While testing client code using
paho.mqtt.client
against broker.emqx.io observed that the connection was not happening and neither theon_connect()
nor theon_connect_fail()
callbacks were invoked. Since most clients would be either MQTTv3.1.1 or MQTTv5.0 this should not be a problem. In local testing and implementations I have chosen to go with MQTT 5 -
The plugins to intercept messages from EMQx ( which is probably the more efficient mechanism) in order to persist them are available only in the enterprise version and not in the community edition. As a workaround, I created an MQTT client which subscribes to
#
and allows subsequent processing. -
Currently the configuration of the MQTT bridge is a manual step via the EMQX dashboard. Need to automate this via code
-
Need to study and understand which kubernetes storage class is better suited for this use-case of UNS
-
The proto files were not being compiled correctly with Protobuf Ver 3.20.0 and higher. After raising the issue it was found that that from this version onwards we need to provide the additional parameter
--pyi_out
-
The protoc executable for Linux is for x86_64 architecture and will need execute rights to be able to run and compile the sparkplug_b.proto specification. The protoc executable for Windows is for a 64 bit processor. For other architectures please download the appropriate pre compiled version of Protobuf release v26.1 e.g.
-
Need to understand how to handle metric types DataSet, Template
Copyright (c) 2016-2022 Eclipse Foundation. This software or document includes material copied from or derived from the Sparkplug Specification