• Golang MQTT的使用 实现发布订阅


    Golang MQTT的使用 实现发布订阅

    Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。

    官方源代码地址: github.com/eclipse/paho.mqtt.golang

    Go应用使用mqtt通信协议的时候, 是作为client端使用的, server端自然需要一个服务来承载, 有很多软件提供MQTT协议支持, 比如mosquitto mqtt, emqx, smqtt, rabbitmq mqtt, pulsar mop mqtt等等.

    MQTT服务安装

    Centos离线安装RabbitMQ并开启MQTT

    Linux安装mosquitto mqtt几种方式

    emqx broker安装

    docker安装SMQTT

    Golang使用MQTT

    #下载MQTT依赖包
    go get github.com/eclipse/paho.mqtt.golang
    
    • 1
    • 2

    Go 语言的 Paho MQTT 连接 EMQX Broker例子,并进行消息收发

    package main
    
    import (
    	"fmt"
    	uuid "github.com/satori/go.uuid"
    	"os"
    	"strings"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    var fallbackFun mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    	//打印收到的topic和内容
    	fmt.Printf("topic: %s\n", msg.Topic())
    	fmt.Printf("msg: %s\n", msg.Payload())
    	//收到的topic $test/msg/liang/get
    
    	//处理发布的topic  $abcd/msg/liang/set
    	split := strings.Split(msg.Topic(), "/")
    	liang := split[2]
    
    	setTopic := "$abcd/msg/" + liang + "/set"
    
    	//处理要发送的数据
    	data := "{\"msg\":\"hello\"}"
    
    	// 处理完之后 发布消息 $abcd/msg/liang/set
    	client.Publish(setTopic, 0, false, data)
    }
    
    func main() {
    	clientId := uuid.NewV4()
    	opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.1.8:1883").SetClientID(fmt.Sprintf("%s", clientId))
    
    	opts.SetKeepAlive(60 * time.Second)
    	// 设置消息回调处理函数 fallbackFun
    	opts.SetDefaultPublishHandler(fallbackFun)
    	opts.SetPingTimeout(1 * time.Second)
    
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    
    	// 订阅主题 $test/msg/#
    	if token := c.Subscribe("$test/msg/#", 0, nil); token.Wait() && token.Error() != nil {
    		fmt.Println(token.Error())
    		os.Exit(1)
    	}
    
    	select {
    
    	}
    
    	// 断开连接
    	c.Disconnect(250)
    	time.Sleep(1 * time.Second)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    参考链接:
    https://www.emqx.io/docs/zh/v4.4/development/go.html#mqtt-go-%E4%BD%BF%E7%94%A8%E7%A4%BA%E4%BE%8B

    https://blog.justwe.site/post/tcp-mqtt/

  • 相关阅读:
    Codeforces Round 860
    分布式版本控制工具 Git 的使用方式
    跑步耳机哪种好,推荐五款在运动中值得佩戴的耳机推荐
    Flutter高仿微信-第53篇-群聊-删除并退出
    网络安全笔记 -- 文件上传2(内容逻辑数组绕过、中间件漏洞绕过、WAF绕过)
    音频编解码,利用FAAC来实现AAC编码
    01- Java概述
    Emacs之高亮显示超过80个字符部分(一百三十)
    CSS 3之美化表格样式
    呕心整理的常用热门API大全
  • 原文地址:https://blog.csdn.net/yinjl123456/article/details/127558839