Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to Receive Messages on HiveMQ Broker #689

Open
neilor-mendes opened this issue Aug 29, 2024 · 1 comment
Open

Unable to Receive Messages on HiveMQ Broker #689

neilor-mendes opened this issue Aug 29, 2024 · 1 comment

Comments

@neilor-mendes
Copy link

neilor-mendes commented Aug 29, 2024

Hi everyone,

I’m encountering an issue with my code that’s supposed to connect to a HiveMQ Broker. While the connection seems to be established successfully (as indicated by the keepAlive and ping logs), I'm not receiving any messages sent to the topic.

Here’s what I’ve done so far:

  • Successfully connected to the broker.
  • Set up the SetDefaultPublishHandler.

I’m unsure what might be missing or incorrectly configured. Any guidance or suggestions would be greatly appreciated!

Thank you in advance for your help.

Version

go 1.23.0

require (
        github.com/eclipse/paho.mqtt.golang v1.5.0
        github.com/golang/glog v1.2.2
)

require (
        github.com/golang/snappy v0.0.4 // indirect
        github.com/gorilla/websocket v1.5.3 // indirect
        github.com/klauspost/compress v1.13.6 // indirect
        github.com/montanaflynn/stats v0.7.1 // indirect
        github.com/pelletier/go-toml v1.9.5 // indirect
        github.com/xdg-go/pbkdf2 v1.0.0 // indirect
        github.com/xdg-go/scram v1.1.2 // indirect
        github.com/xdg-go/stringprep v1.0.4 // indirect
        github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
        go.mongodb.org/mongo-driver v1.16.1 // indirect
        golang.org/x/crypto v0.25.0 // indirect
        golang.org/x/net v0.27.0 // indirect
        golang.org/x/sync v0.7.0 // indirect
        golang.org/x/text v0.16.0 // indirect
)

Code

package main

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, os.Interrupt)
	signal.Notify(sig, syscall.SIGTERM)

	mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
	mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
	mqtt.WARN = log.New(os.Stdout, "[WARN]  ", 0)
	mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)

	var knt int = 0

	// Load CA certificate
	certpool := x509.NewCertPool()
	pemCerts, err := os.ReadFile("mqttutil/cacert.pem")

	if err != nil {
		log.Fatalf("Error loading CA certificate: %v", err)
	}

	if !certpool.AppendCertsFromPEM(pemCerts) {
		log.Fatalf("Failed to append CA certificate")
	}

	tlsConfig := &tls.Config{
		RootCAs:            certpool,
		ClientAuth:         tls.NoClientCert,
		ClientCAs:          nil,
		InsecureSkipVerify: true,
	}

	opts := mqtt.NewClientOptions().
		AddBroker("ssl://broker.hivemq.com:8883").
		SetClientID("mqtt2mongo").
		SetUsername("username").
		SetPassword("password").
		SetTLSConfig(tlsConfig).
		SetAutoReconnect(true).
		SetConnectRetry(true).
		SetOrderMatters(false).
		SetCleanSession(true)

	opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
		fmt.Printf("MSG: %s\n", msg.Payload())
		text := fmt.Sprintf("this is result msg #%d!", knt)
		knt++
		token := client.Publish("nn/result", 0, false, text)
		token.Wait()
	})

	topic := "nn/result"

	opts.OnConnect = func(c mqtt.Client) {
		if token := c.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}
	}

	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	} else {
		fmt.Printf("Connected to server\n")
	}

	<-sig
	fmt.Println("signal caught - exiting")
	defer client.Unsubscribe("nn/result")
	defer client.Disconnect(250)
	fmt.Println("shutdown complete")
}

Debug Log

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [client]   startCommsWorkers done
[WARN]  [store]    memorystore wiped
Connected to server
[DEBUG] [client]   enter Subscribe
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [pinger]   keepalive starting
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [client]   exit startClient
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [nn/result]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [client]   sending subscribe message, topic: nn/result
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [0]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [pinger]   ping check 4.9984945
[DEBUG] [pinger]   ping check 9.9990185
[DEBUG] [pinger]   ping check 14.9984701
[DEBUG] [pinger]   ping check 19.9986013
[DEBUG] [pinger]   ping check 24.9985122
[DEBUG] [pinger]   ping check 29.998588
[DEBUG] [pinger]   ping check 34.9987725
[DEBUG] [pinger]   keepalive sending ping
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received pingresp
...
@MattBrittan
Copy link
Contributor

  • Downloaded the CA Cert
  • Compiled/Ran your code
  • Used MQTTX to connect to broker.hivemq.com ands published a message to the nn/result topic

The message was received in your code as expected. Also, as expected, this led to an infinite loop where the client published a message to nn/result every time it receives a message (and hencde receives another message).

You don't indicate how/when you published a message on the topic in the above so all I can really say is that your code works as expected for me. The logs show a normal connection and subscription but no messages received (which would seem to indicate the broker did not send any messages on the specified topic).

Please note that this area is really intended for bugs; the readme suggests places to ask for more general help (Stackoverflow generally results in a quick answer when a detailed question is provided).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants