Mqtt服务搭建
背景:物联网项目中, 很多时候要求设备与后台实现双向通信,因为设备时常处于内网或离线,此时Mqtt就派上用场了
实现原理:emqx作为消息中间件,连接后台和设备的发布和订阅端,后台发布某个topic的消息,设备订阅某个topic的消息。后台也可以订阅某个topic的消息,设备也可以发布某个topic的消息,topic作为点对点的消息通道
示例:一台云服务器向六台mini服务器发送采样指令,同时订阅六台mini服务器上报的数据
Step1:我们在云服务器上使用docker-compose搭建emqx服务【我们使用的是5.0.26版本】
docker-compose.yaml
version: "3.7"
services:
emqx:
image: emqx:5.0.26
container_name: "emqx"
environment:
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
ports:
- "8083:8083"
- "8084:8084"
- "8883:8883"
- "18083:18083"
- "1883:1883"
- "18081:8081"
networks:
- www
logging:
options:
max-size: "2M" # 单个文件大小为200k
max-file: "10" # 最多10个文件
restart: always
networks:
www:
external: true
然后执行docker-compose up -d 就搭建好了【默认账号密码:admin/public】

ip:18083是emqx管理端的页面
ip:1883是emqx的mqtt的连接地址

Step2: 设置客户端连接的账号密码已经秘钥
-
由于mqtt连接需要使用账号密码,所以需要设置【在侧边栏选择 客户端认证-内置数据库-用户管理-点击+】

-
mqtt虽然是长连接,但由于某些情况,例如设备网络波动,连接超时等原因导致设备未能连上mqtt,后台发布的topic需要客户端在线才能订阅到,因此需要创建秘钥,在下发指令时,先判断客户端是否在线

示例代码
func (wqc WaterQualityConfig) CheckWaterDeviceConnect(ClientID string) (Connected bool) { ip := Common{}.GetLocalIP() url := "http://" + ip + ":18083/api/v5/clients/" + ClientID // 用户名和密码 username := Key password := Secret // 拼接 Basic Auth auth := username + ":" + password basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) req, err := http.NewRequest("GET", url, nil) // 添加 Authorization 头 req.Header.Add("Authorization", basicAuth) logger.Info(context.Background(), "CheckWaterDeviceConnect", logger.String("ClientID:", ClientID), logger.String("url:", url), logger.String("basicAuth:", basicAuth)) // 发起请求 client := &http.Client{} resp, err := client.Do(req) if err != nil { fmt.Println("请求失败:", err) return } defer resp.Body.Close() // 根据 HTTP 响应状态码判断设备是否在线 if resp.StatusCode == http.StatusOK { // 200 // 设备在线,API 返回了客户端信息 body, _ := ioutil.ReadAll(resp.Body) fmt.Println("设备在线,响应内容:", string(body)) return true } else if resp.StatusCode == http.StatusNotFound { // 404 // 设备不在线 fmt.Println("设备不在线(未找到):", ClientID) return false } else { // 其它状态码,比如 401(未授权)、500 等 body, _ := ioutil.ReadAll(resp.Body) fmt.Printf("查询设备状态失败,状态码: %d, 响应: %s\n", resp.StatusCode, string(body)) return false } }
Step3: 在项目中配置mqtt的发布和订阅参数
is_open_mqtt_pub: 0 // 是否开启mqtt发布
is_open_mqtt_sub: 1 // 是否开启mqtt订阅
// 订阅的配置
mqtt_sub:
- host: "ip" // 部署emqx的服务器ip
port: "1883" // emqx的默认连接端口
username: "username" // 在emqx管理端中客户端认证默认-内置数据库中添加的账号
password: "password" // 在emqx管理端中客户端认证默认-内置数据库中添加的密码
clean_session: false // 重连时是否保存topic
auto_reconnect: true // 是否自动重连
client_id: "53000101qishuo01" // 唯一id【不能重复,否则会一直重连】
topic: "nbi/lj-qishuo/53000101qishuo01/#" // 订阅的topic
qos: 1 // 订阅质量【0:只发一次 1:确保能收到一次 2:可能会重复】
pro: 3 // 自定义版本
tls: "" // 证书
// 发布的配置
mqtt_pub:
host: "117.173.165.135"
port: "1883"
username: "erlang-server"
password: "erlang-server-pwd"
clean_session: false
auto_reconnect: true
client_id: "53000101qishuo01-pub"
qos: 1
pro: 3
tls: ""
Step4: 配置项目启动,直接上代码
pub.go
package pubmqtt
import (
"context"
"crypto/md5"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math"
"math/rand"
"time"
"brewing-bigdata/config"
"git.dev.nongbotech.cn/nbi-gopkg/logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var (
client mqtt.Client
Connected bool
)
// MQTTInit Mqtt初始化
func MQttInit(opt config.MqttOption) error {
var err error
if opt.Host != "" && opt.Port != "" {
go func() {
for {
client, err = connect(opt)
if err != nil {
client = nil
logger.Error(context.Background(), "pubmqtt init err")
time.Sleep(time.Second * 10)
} else {
logger.Info(context.Background(), "pubmqtt init success")
break
}
}
}()
}
return err
}
func Pubdata(deviceSn, data interface{}) {
payload, _ := json.Marshal(data)
ShuizhiDevice := []string{
"53000101qishuo01",
"53000101qishuo02",
"53000101qishuo03",
"53000101qishuo04",
"53000101qishuo05",
"53000101qishuo06",
}
for _, device := range ShuizhiDevice {
if device == deviceSn {
logger.Info(context.Background(), "pubmqtt publish data", logger.String("topic", "nbi/lj-qishuo/"+device+"/order"), logger.Any("data", string(payload)))
client.Publish("nbi/lj-qishuo/"+device+"/order", 0x00, false, payload)
}
}
}
// connect 订阅 mqtt 代理
func connect(option config.MqttOption) (mqtt.Client, error) {
// 创建一个客户端选项配置类型附带默认的配置值
cliOpt := mqtt.NewClientOptions()
// 判断mqtt服务器使用的是ssl或tcp协议,并设置连接的mqtt服务器url
if option.TLS != "" { // 证书文件
cfg, err := newTLSConfig(option.TLS)
if err != nil {
return nil, err
}
// 配置host并设置ssl证书
cliOpt = cliOpt.AddBroker("ssl://" + option.Host + ":" + option.Port).SetTLSConfig(cfg)
} else {
cliOpt = cliOpt.AddBroker(option.Host + ":" + option.Port)
}
// 配置客户端的用户名,密码,客户端id
cliOpt.Username = option.Username
cliOpt.Password = option.Password
cliOpt.ClientID = option.ClientID
// 如果客户端id为空,则生成客户端id,根据MQTT v3.1规范,客户端ID不得超过23个字符。
if option.ClientID == "" {
rand.Seed(time.Now().UnixNano())
hs := md5.New()
hs.Write([]byte(option.Username))
hs.Write([]byte(option.Password))
hs.Write([]byte(fmt.Sprint(time.Now().UnixNano())))
hs.Write([]byte(fmt.Sprint(rand.Intn(math.MaxInt64))))
cliOpt.ClientID = hex.EncodeToString(hs.Sum(nil))
}
// 协议版本
if option.Pro > 0 {
cliOpt.ProtocolVersion = option.Pro
}
// 不保存会话信息,断开重连后是一个新的会话
cliOpt.CleanSession = option.CleanSession
// 是否自动重连
cliOpt.AutoReconnect = true
// 初始化连接时或自动重连时的调用函数
cliOpt.OnConnect = func(client mqtt.Client) {
Connected = true
// 打印订阅成功日志
logger.Info(context.Background(), "subscribe succeed", logger.Any("option", option))
}
// 初始化连接丢失回调
cliOpt.OnConnectionLost = func(client mqtt.Client, e error) {
Connected = false
}
client := mqtt.NewClient(cliOpt)
token := client.Connect()
if !token.WaitTimeout(time.Second * 15) {
return nil, errors.New("connect timeout")
}
if err := token.Error(); err != nil {
logger.Warn(context.Background(), "connect error", logger.String("err", err.Error()), logger.Any("option", option))
return nil, fmt.Errorf("connect error,%s", err.Error())
}
return client, nil
}
// newTLSConfig mqtt tls 连接相关
func newTLSConfig(f string) (*tls.Config, error) {
certs := x509.NewCertPool()
pemData, err := ioutil.ReadFile(f)
if err != nil {
return nil, err
}
if !certs.AppendCertsFromPEM(pemData) {
return nil, fmt.Errorf("append certs from pem err")
}
return &tls.Config{
RootCAs: certs,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
}, nil
}
sub.go
package subscribe
import (
"context"
"crypto/md5"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math"
"math/rand"
"strings"
"sync"
"time"
"git.dev.nongbotech.cn/nbi-gopkg/helper"
"git.dev.nongbotech.cn/nbi-gopkg/logger"
"brewing-bigdata/config"
"brewing-bigdata/internal/app/handler"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var EnableMQTTCache bool
var (
isSubscribed bool
subscribeMu sync.Mutex // 避免并发问题
)
func MqttInit(mqtts []config.MqttOption) {
for _, opt := range mqtts {
// mqtt协议消息订阅
// 循环订阅,10秒尝试
go func(opt config.MqttOption) {
for {
// 启用mqtt缓存处理方式,将不再订阅
client, err := Connect(opt, !EnableMQTTCache)
if err != nil {
logger.Error(context.Background(), "connection err", logger.String("error", err.Error()))
time.Sleep(time.Second * 10)
} else {
// 保存client到map
rd := client.OptionsReader()
brokerHash := helper.Md5(rd.Servers()[0].Host + rd.Servers()[0].Port() + rd.Username())
config.MQTTClientHashMap[brokerHash] = client
logger.Info(context.Background(), "mqtt client hash map", logger.Any("common.MQTTClientHashMap", config.MQTTClientHashMap))
logger.Info(context.Background(), "connection succeeded", logger.Any("option", opt))
break
}
}
}(opt)
}
}
// Subscribe 订阅 mqtt 代理
// MQTT v3.1.1 An application may connect to an MQTT server using:
// A plain TCP socket
// or A secure SSL/TLS socket
// or A websocket
//
// MQTTCache 启用msgCache handler
// EnableSub 启用订阅
func Connect(option config.MqttOption, EnableSub bool) (mqtt.Client, error) {
logger.Info(context.Background(), "mqtt connect begin", logger.Any("option", option))
// 创建一个客户端选项配置类型附带默认的配置值
cliOpt := mqtt.NewClientOptions()
// 判断mqtt服务器使用的是ssl或tcp协议,并设置连接的mqtt服务器url
if option.TLS != "" {
cfg, err := newTLSConfig(option.TLS)
if err != nil {
return nil, err
}
// 配置host并设置ssl证书
cliOpt = cliOpt.AddBroker("ssl://" + option.Host + ":" + option.Port).SetTLSConfig(cfg)
} else {
cliOpt = cliOpt.AddBroker(option.Host + ":" + option.Port)
}
// 配置客户端的用户名,密码,客户端id
cliOpt.Username = option.Username
cliOpt.Password = option.Password
cliOpt.ClientID = option.ClientID
// 如果客户端id为空,则生成客户端id,根据MQTT v3.1规范,客户端ID不得超过23个字符。
if option.ClientID == "" {
rand.Seed(time.Now().UnixNano())
hs := md5.New()
hs.Write([]byte(option.Username))
hs.Write([]byte(option.Password))
hs.Write([]byte(fmt.Sprint(time.Now().UnixNano())))
hs.Write([]byte(fmt.Sprint(rand.Intn(math.MaxInt64))))
cliOpt.ClientID = hex.EncodeToString(hs.Sum(nil))
}
// 协议版本
if option.Pro > 0 {
cliOpt.ProtocolVersion = option.Pro
}
// 不保存会话信息,断开重连后是一个新的会话
cliOpt.CleanSession = option.CleanSession
// 是否自动重连
cliOpt.AutoReconnect = true
// 初始化连接时或自动重连时的调用函数
cliOpt.OnConnect = func(client mqtt.Client) {
// 判断当前的连接类型,设置相应的处理函数
// 如果已经订阅过了,就不再重复订阅
subscribeMu.Lock()
defer subscribeMu.Unlock()
// 如果已经订阅过,直接返回
if isSubscribed {
logger.Info(context.Background(), "Already subscribed, skip re-subscribe on reconnect")
return
}
var msgHandler mqtt.MessageHandler
msgHandler = handler.HandleMsg
topics := strings.Split(option.Topic, ",")
topicMap := map[string]byte{}
for _, t := range topics {
topicMap[t] = option.Qos
}
var token mqtt.Token
// 消息订阅
token = client.Subscribe(
option.Topic,
option.Qos,
func(client mqtt.Client, message mqtt.Message) {
go msgHandler(client, message)
})
// 判断是否超时
if !token.WaitTimeout(time.Second * 30) {
logger.Info(context.Background(), "subscribe timeout", logger.Any("option", option))
return
}
// 判断是否发生错误
if err := token.Error(); err != nil {
logger.Info(context.Background(), "subscribe error", logger.String("err", err.Error()), logger.Any("option", option))
return
}
// 打印订阅成功日志
logger.Info(context.Background(), "subscribe succeed", logger.Any("option", option), logger.Any("token", token))
isSubscribed = true // 标记已订阅
}
// 初始化连接丢失回调
cliOpt.OnConnectionLost = func(client mqtt.Client, e error) {
}
client := mqtt.NewClient(cliOpt)
token := client.Connect()
if !token.WaitTimeout(time.Second * 15) {
return nil, errors.New("connect timeout")
}
if err := token.Error(); err != nil {
logger.Warn(context.Background(), "connect error", logger.String("err", err.Error()), logger.Any("option", option))
return nil, fmt.Errorf("connect error,%s", err.Error())
}
return client, nil
}
// newTLSConfig mqtt tls 连接相关
func newTLSConfig(f string) (*tls.Config, error) {
certs := x509.NewCertPool()
pemData, err := ioutil.ReadFile(f)
if err != nil {
return nil, err
}
if !certs.AppendCertsFromPEM(pemData) {
return nil, fmt.Errorf("append certs from pem err")
}
return &tls.Config{
RootCAs: certs,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
}, nil
}
func HandleMsg(client mqtt.Client, message mqtt.Message) {
spanCtx := logger.Start(context.Background(), "HandleMsg")
defer logger.End(spanCtx)
fmt.Println("收到了来自服务器的消息")
topic, payload := DataPrepare(spanCtx, client, message)
fmt.Println("topic:", topic, "message: ", payload, "qos:", message.Qos())
payloadMap := make(map[string]interface{})
json.Unmarshal(payload, &payloadMap)
if _, ok := payloadMap["remain_times"]; ok {
}
logger.Info(spanCtx, "HandleMsg", logger.String("topic", topic), logger.Any("payload", payloadMap))
}
func DataPrepare(ctx context.Context, cli mqtt.Client, msg mqtt.Message) (
topic string,
payload []byte,
) {
spanCtx := logger.Start(ctx, "LoraDataPrepare")
defer logger.End(spanCtx)
topic = msg.Topic()
payload = msg.Payload()
return
}
handler.go
package handler
import (
"brewing-bigdata/internal/app/logic"
"brewing-bigdata/internal/common"
"brewing-bigdata/internal/schema"
"context"
"encoding/json"
"git.dev.nongbotech.cn/nbi-gopkg/logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func HandleMsg(client mqtt.Client, message mqtt.Message) {
spanCtx := logger.Start(context.Background(), "HandleMsg")
defer logger.End(spanCtx)
topic, payload := DataPrepare(spanCtx, client, message)
payloadMap := make(map[string]interface{})
json.Unmarshal(payload, &payloadMap)
logger.Info(spanCtx, "来自服务器消息-HandleMsg", logger.String("topic", topic), logger.Any("payloadMap", payloadMap))
IP := "117.173.165.135"
var Admin common.UserInfo
Admin.CompanyID = "1699432555659652"
if _, ok := payloadMap["remain_times"]; ok { // 修改剩余次数
logger.Info(spanCtx, "modify remain times")
var Params schema.ModifyWaterDeviceRemainTimes
// id := payloadMap["id"].(string)
deviceSn := payloadMap["device_sn"].(string)
name := payloadMap["name"].(string)
remainTimes := payloadMap["remain_times"].(float64)
Params.DeviceSn = deviceSn
Params.Name = name
Params.RemainTimes = int64(remainTimes)
res, err := logic.WaterQualityConfig{}.ModifyWaterDeviceRemainTimes(Params, 1)
logger.Info(spanCtx, "modify remain times", logger.Any("params", Params), logger.Any("res", res), logger.Any("err", err))
} else if _, ok := payloadMap["code"]; ok { // 立即采样
logger.Info(spanCtx, "sample now")
id := payloadMap["id"].(string)
deviceSn := payloadMap["device_sn"].(string)
code := payloadMap["code"].(string)
cate := payloadMap["cate"].(float64)
commandTime := payloadMap["command_time"].(float64)
commandID := payloadMap["command_id"].(string)
req := payloadMap["req"].(string)
var Params schema.QishuoSampleNowParams
Params.ID = id
Params.DeviceSn = deviceSn
Params.Code = code
Params.Cate = int8(cate)
Params.CommandTime = int64(commandTime)
Params.CommandID = commandID
Params.Req = req
data, err := logic.WaterQualityConfig{}.QishuoSampleNow(context.Background(), Params, Admin, IP, 1)
logger.Info(spanCtx, "sample now", logger.Any("params", Params), logger.Any("data", data), logger.Any("err", err))
} else { // 其它
logger.Info(spanCtx, "other message")
}
logger.Info(spanCtx, "HandleMsg", logger.String("topic", topic), logger.Any("payload", payloadMap))
}
func DataPrepare(ctx context.Context, cli mqtt.Client, msg mqtt.Message) (
topic string,
payload []byte,
) {
spanCtx := logger.Start(ctx, "LoraDataPrepare")
defer logger.End(spanCtx)
topic = msg.Topic()
payload = msg.Payload()
return
}
Step5: 项目初始化
mqtt.go
package config
import (
"context"
"path/filepath"
"runtime"
"git.dev.nongbotech.cn/nbi-gopkg/logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/spf13/viper"
)
type MqttOption struct {
Host string `json:"host" mapstructure:"host"` // mqtt服务器url
Port string `json:"port" mapstructure:"port"` // mqtt服务器端口
HTTPHost string `json:"http_host" mapstructure:"http_host"`
HTTPPort string `json:"http_port" mapstructure:"http_port"`
Username string `json:"username" mapstructure:"username"`
Password string `json:"password" mapstructure:"password"`
ClientID string `json:"client_id" mapstructure:"client_id"`
CleanSession bool `json:"clean_session" mapstructure:"clean_session"`
AutoReconnect bool `json:"auto_reconnect" mapstructure:"auto_reconnect"`
Topic string `json:"topic" mapstructure:"topic"`
Pro uint `json:"pro" mapstructure:"pro"`
Qos byte `json:"qos" mapstructure:"qos"`
Handler string `json:"handler" mapstructure:"handler"` // lora | zeta
TLS string `json:"tls" mapstructure:"tls"`
}
var MQTTClientHashMap map[string]mqtt.Client = make(map[string]mqtt.Client)
var Conf struct {
// IsOpenMqtt int64 `mapstructure:"is_open_mqtt"` // 是否开启mqtt服务,0关闭 1开启
IsOpenMqttPub int64 `mapstructure:"is_open_mqtt_pub"` // 是否开启mqtt_pub服务,0关闭 1开启
IsOpenMqttSub int64 `mapstructure:"is_open_mqtt_sub"` // 是否开启mqtt_sub服务,0关闭 1开启
EnableMQTTCache bool `mapstructure:"enable_mqtt_cache"` // 是否启用mqtt消息缓存功能
MqttOps []MqttOption `mapstructure:"mqtt_sub"` // mqtt-brokers订阅
Distribution MqttOption `mapstructure:"mqtt_pub"` // 发布订阅
}
// ParseConf 配置解析
func ParseConf() {
// 获取当前路径,主要用于单元测试
_, Dir, _, _ := runtime.Caller(0)
currentDir := filepath.Dir(Dir)
// viper 解析配置
vp := viper.New()
vp.SetConfigType("yaml")
vp.SetConfigName("app")
vp.AddConfigPath("./")
vp.AddConfigPath("./etc")
vp.AddConfigPath(currentDir + "/../etc")
err := vp.ReadInConfig()
if err != nil {
panic("parse conf error")
}
vp.Unmarshal(&Conf)
logger.Info(context.Background(), "parse conf success", logger.Any("Conf", Conf))
}
boot.go
package boot
import (
"brewing-bigdata/internal/app/pubmqtt"
"brewing-bigdata/internal/app/subscribe"
"context"
mqttConf "brewing-bigdata/config"
"git.dev.nongbotech.cn/nbi-gopkg/logger"
)
// 项目运行的入口处,调用这个函数即可初始化mqtt服务
func APPInit() {
// IsOpenMqtt := mqttConf.Conf.IsOpenMqtt // 是否开启mqtt服务
IsOpenMqttPub := mqttConf.Conf.IsOpenMqttPub // 是否开启mqtt_pub服务
IsOpenMqttSub := mqttConf.Conf.IsOpenMqttSub // 是否开启mqtt_sub服务
if IsOpenMqttPub == 1 { // 发布初始化
logger.Info(context.Background(), "mqttConf.Conf.Pub", logger.Any("pubconf: ", mqttConf.Conf.Distribution))
pubmqtt.MQttInit(mqttConf.Conf.Distribution) // 发布
}
if IsOpenMqttSub == 1 { // 订阅初始化
logger.Info(context.Background(), "mqttConf.Conf.Sub", logger.Any("subconf: ", mqttConf.Conf.MqttOps))
subscribe.MqttInit(mqttConf.Conf.MqttOps) // 订阅
}
if IsOpenMqttPub == 0 && IsOpenMqttSub == 0 {
logger.Info(context.Background(), "mqttConf is not nedd open", logger.Any("pubconf: ", mqttConf.Conf.IsOpenMqttPub), logger.Any("subconf: ", mqttConf.Conf.IsOpenMqttSub))
}
}
此时就搭建好了mqtt服务了,服务端调用pubmqtt.Pubdata()即可向指定的设备发送消息了