MQTT Go 客户端库

    1. import (
    2. "fmt"
    3. "log"
    4. "os"
    5. "time"
    6. "github.com/eclipse/paho.mqtt.golang"
    7. )
    8. var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    9. fmt.Printf("TOPIC: %s\n", msg.Topic())
    10. fmt.Printf("MSG: %s\n", msg.Payload())
    11. }
    12. func main() {
    13. mqtt.ERROR = log.New(os.Stdout, "", 0)
    14. opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")
    15. opts.SetKeepAlive(60 * time.Second)
    16. // 设置消息回调处理函数
    17. opts.SetDefaultPublishHandler(f)
    18. opts.SetPingTimeout(1 * time.Second)
    19. c := mqtt.NewClient(opts)
    20. if token := c.Connect(); token.Wait() && token.Error() != nil {
    21. panic(token.Error())
    22. }
    23. // 订阅主题
    24. if token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {
    25. }
    26. // 发布消息
    27. token := c.Publish("testtopic/1", 0, false, "Hello World")
    28. token.Wait()
    29. time.Sleep(6 * time.Second)
    30. // 取消订阅
    31. if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {
    32. fmt.Println(token.Error())
    33. os.Exit(1)
    34. }
    35. // 断开连接
    36. c.Disconnect(250)
    37. }

    Paho Golang MQTT 5.0 支持