Press "Enter" to skip to content

Mqtt服务搭建

Mqtt服务搭建

背景:物联网项目中, 很多时候要求设备与后台实现双向通信,因为设备时常处于内网或离线,此时Mqtt就派上用场了

实现原理:emqx作为消息中间件,连接后台和设备的发布和订阅端,后台发布某个topic的消息,设备订阅某个topic的消息。后台也可以订阅某个topic的消息,设备也可以发布某个topic的消息,topic作为点对点的消息通道

示例:一台云服务器向六台mini服务器发送采样指令,同时订阅六台mini服务器上报的数据

Step1:我们在云服务器上使用docker-compose搭建emqx服务【我们使用的是5.0.26版本】

docker-compose.yaml

version: "3.7"
services:
  emqx:
    image: emqx:5.0.26
    container_name: "emqx"
    environment:
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    ports:
      - "8083:8083"
      - "8084:8084"
      - "8883:8883"
      - "18083:18083"
      - "1883:1883"
      - "18081:8081"
    networks:
      - www
    logging:
      options:
        max-size: "2M" # 单个文件大小为200k
        max-file: "10" # 最多10个文件
    restart: always
networks:
  www:
    external: true

然后执行docker-compose up -d 就搭建好了【默认账号密码:admin/public】

1756274462852

ip:18083是emqx管理端的页面

ip:1883是emqx的mqtt的连接地址

1756274569757

Step2: 设置客户端连接的账号密码已经秘钥

  • 由于mqtt连接需要使用账号密码,所以需要设置【在侧边栏选择 客户端认证-内置数据库-用户管理-点击+】

    1756274725015

  • mqtt虽然是长连接,但由于某些情况,例如设备网络波动,连接超时等原因导致设备未能连上mqtt,后台发布的topic需要客户端在线才能订阅到,因此需要创建秘钥,在下发指令时,先判断客户端是否在线

    1756275010425

    示例代码

    func (wqc WaterQualityConfig) CheckWaterDeviceConnect(ClientID string) (Connected bool) {
    	ip := Common{}.GetLocalIP()
    	url := "http://" + ip + ":18083/api/v5/clients/" + ClientID
    
    	// 用户名和密码
    	username := Key
    	password := Secret
    	// 拼接 Basic Auth
    	auth := username + ":" + password
    	basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
    	req, err := http.NewRequest("GET", url, nil)
    	// 添加 Authorization 头
    	req.Header.Add("Authorization", basicAuth)
    	logger.Info(context.Background(), "CheckWaterDeviceConnect", logger.String("ClientID:", ClientID), logger.String("url:", url), logger.String("basicAuth:", basicAuth))
    	// 发起请求
    	client := &http.Client{}
    	resp, err := client.Do(req)
    	if err != nil {
    		fmt.Println("请求失败:", err)
    		return
    	}
    	defer resp.Body.Close()
    	// 根据 HTTP 响应状态码判断设备是否在线
    	if resp.StatusCode == http.StatusOK { // 200
    		// 设备在线,API 返回了客户端信息
    		body, _ := ioutil.ReadAll(resp.Body)
    		fmt.Println("设备在线,响应内容:", string(body))
    		return true
    	} else if resp.StatusCode == http.StatusNotFound { // 404
    		// 设备不在线
    		fmt.Println("设备不在线(未找到):", ClientID)
    		return false
    	} else {
    		// 其它状态码,比如 401(未授权)、500 等
    		body, _ := ioutil.ReadAll(resp.Body)
    		fmt.Printf("查询设备状态失败,状态码: %d, 响应: %s\n", resp.StatusCode, string(body))
    		return false
    	}
    }
    

Step3: 在项目中配置mqtt的发布和订阅参数

is_open_mqtt_pub: 0 // 是否开启mqtt发布
is_open_mqtt_sub: 1 // 是否开启mqtt订阅
// 订阅的配置
mqtt_sub:
  - host: "ip" // 部署emqx的服务器ip
    port: "1883" // emqx的默认连接端口
    username: "username" // 在emqx管理端中客户端认证默认-内置数据库中添加的账号
    password: "password" // 在emqx管理端中客户端认证默认-内置数据库中添加的密码
    clean_session: false // 重连时是否保存topic
    auto_reconnect: true // 是否自动重连
    client_id: "53000101qishuo01" // 唯一id【不能重复,否则会一直重连】
    topic: "nbi/lj-qishuo/53000101qishuo01/#" // 订阅的topic
    qos: 1 // 订阅质量【0:只发一次 1:确保能收到一次 2:可能会重复】
    pro: 3 // 自定义版本
    tls: "" // 证书
// 发布的配置
mqtt_pub:
  host: "117.173.165.135"
  port: "1883"
  username: "erlang-server"
  password: "erlang-server-pwd"
  clean_session: false
  auto_reconnect: true
  client_id: "53000101qishuo01-pub"
  qos: 1
  pro: 3
  tls: ""

Step4: 配置项目启动,直接上代码

pub.go

package pubmqtt

import (
	"context"
	"crypto/md5"
	"crypto/tls"
	"crypto/x509"
	"encoding/hex"
	"encoding/json"
	"errors"
	"fmt"
	"io/ioutil"
	"math"
	"math/rand"
	"time"

	"brewing-bigdata/config"

	"git.dev.nongbotech.cn/nbi-gopkg/logger"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var (
	client    mqtt.Client
	Connected bool
)

// MQTTInit  Mqtt初始化
func MQttInit(opt config.MqttOption) error {
	var err error
	if opt.Host != "" && opt.Port != "" {
		go func() {
			for {
				client, err = connect(opt)
				if err != nil {
					client = nil
					logger.Error(context.Background(), "pubmqtt init err")
					time.Sleep(time.Second * 10)
				} else {
					logger.Info(context.Background(), "pubmqtt init success")
					break
				}
			}
		}()
	}
	return err
}

func Pubdata(deviceSn, data interface{}) {
	payload, _ := json.Marshal(data)
	ShuizhiDevice := []string{
		"53000101qishuo01",
		"53000101qishuo02",
		"53000101qishuo03",
		"53000101qishuo04",
		"53000101qishuo05",
		"53000101qishuo06",
	}
	for _, device := range ShuizhiDevice {
		if device == deviceSn {
			logger.Info(context.Background(), "pubmqtt publish data", logger.String("topic", "nbi/lj-qishuo/"+device+"/order"), logger.Any("data", string(payload)))
			client.Publish("nbi/lj-qishuo/"+device+"/order", 0x00, false, payload)
		}
	}
}

// connect 订阅 mqtt 代理
func connect(option config.MqttOption) (mqtt.Client, error) {
	// 创建一个客户端选项配置类型附带默认的配置值
	cliOpt := mqtt.NewClientOptions()
	// 判断mqtt服务器使用的是ssl或tcp协议,并设置连接的mqtt服务器url
	if option.TLS != "" { // 证书文件
		cfg, err := newTLSConfig(option.TLS)
		if err != nil {
			return nil, err
		}
		// 配置host并设置ssl证书
		cliOpt = cliOpt.AddBroker("ssl://" + option.Host + ":" + option.Port).SetTLSConfig(cfg)
	} else {
		cliOpt = cliOpt.AddBroker(option.Host + ":" + option.Port)
	}
	// 配置客户端的用户名,密码,客户端id
	cliOpt.Username = option.Username
	cliOpt.Password = option.Password
	cliOpt.ClientID = option.ClientID
	// 如果客户端id为空,则生成客户端id,根据MQTT v3.1规范,客户端ID不得超过23个字符。
	if option.ClientID == "" {
		rand.Seed(time.Now().UnixNano())
		hs := md5.New()
		hs.Write([]byte(option.Username))
		hs.Write([]byte(option.Password))
		hs.Write([]byte(fmt.Sprint(time.Now().UnixNano())))
		hs.Write([]byte(fmt.Sprint(rand.Intn(math.MaxInt64))))
		cliOpt.ClientID = hex.EncodeToString(hs.Sum(nil))
	}
	// 协议版本
	if option.Pro > 0 {
		cliOpt.ProtocolVersion = option.Pro
	}
	// 不保存会话信息,断开重连后是一个新的会话
	cliOpt.CleanSession = option.CleanSession
	// 是否自动重连
	cliOpt.AutoReconnect = true
	// 初始化连接时或自动重连时的调用函数
	cliOpt.OnConnect = func(client mqtt.Client) {
		Connected = true
		// 打印订阅成功日志
	logger.Info(context.Background(), "subscribe succeed", logger.Any("option", option))
	}
	// 初始化连接丢失回调
	cliOpt.OnConnectionLost = func(client mqtt.Client, e error) {
		Connected = false
	}
	client := mqtt.NewClient(cliOpt)
	token := client.Connect()
	if !token.WaitTimeout(time.Second * 15) {
		return nil, errors.New("connect timeout")
	}
	if err := token.Error(); err != nil {
		logger.Warn(context.Background(), "connect error", logger.String("err", err.Error()), logger.Any("option", option))
		return nil, fmt.Errorf("connect error,%s", err.Error())
	}
	return client, nil
}

// newTLSConfig mqtt tls 连接相关
func newTLSConfig(f string) (*tls.Config, error) {
	certs := x509.NewCertPool()

	pemData, err := ioutil.ReadFile(f)
	if err != nil {
		return nil, err
	}
	if !certs.AppendCertsFromPEM(pemData) {
		return nil, fmt.Errorf("append certs from pem err")
	}
	return &tls.Config{
		RootCAs:            certs,
		ClientAuth:         tls.NoClientCert,
		ClientCAs:          nil,
		InsecureSkipVerify: true,
	}, nil
}

sub.go

package subscribe

import (
	"context"
	"crypto/md5"
	"crypto/tls"
	"crypto/x509"
	"encoding/hex"
	"encoding/json"
	"errors"
	"fmt"
	"io/ioutil"
	"math"
	"math/rand"
	"strings"
	"sync"
	"time"

	"git.dev.nongbotech.cn/nbi-gopkg/helper"
	"git.dev.nongbotech.cn/nbi-gopkg/logger"

	"brewing-bigdata/config"
	"brewing-bigdata/internal/app/handler"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var EnableMQTTCache bool
var (
	isSubscribed bool
	subscribeMu  sync.Mutex // 避免并发问题
)

func MqttInit(mqtts []config.MqttOption) {
	for _, opt := range mqtts {
		// mqtt协议消息订阅
		// 循环订阅,10秒尝试
		go func(opt config.MqttOption) {
			for {
				// 启用mqtt缓存处理方式,将不再订阅
				client, err := Connect(opt, !EnableMQTTCache)
				if err != nil {
					logger.Error(context.Background(), "connection err", logger.String("error", err.Error()))
					time.Sleep(time.Second * 10)
				} else {
					// 保存client到map
					rd := client.OptionsReader()
					brokerHash := helper.Md5(rd.Servers()[0].Host + rd.Servers()[0].Port() + rd.Username())
					config.MQTTClientHashMap[brokerHash] = client

					logger.Info(context.Background(), "mqtt client hash map", logger.Any("common.MQTTClientHashMap", config.MQTTClientHashMap))
					logger.Info(context.Background(), "connection succeeded", logger.Any("option", opt))
					break
				}
			}
		}(opt)
	}
}

// Subscribe 订阅 mqtt 代理
// MQTT v3.1.1 An application may connect to an MQTT server using:
// A plain TCP socket
// or A secure SSL/TLS socket
// or A websocket
//
// MQTTCache 启用msgCache handler
// EnableSub 启用订阅
func Connect(option config.MqttOption, EnableSub bool) (mqtt.Client, error) {
	logger.Info(context.Background(), "mqtt connect begin", logger.Any("option", option))
	// 创建一个客户端选项配置类型附带默认的配置值
	cliOpt := mqtt.NewClientOptions()
	// 判断mqtt服务器使用的是ssl或tcp协议,并设置连接的mqtt服务器url
	if option.TLS != "" {
		cfg, err := newTLSConfig(option.TLS)
		if err != nil {
			return nil, err
		}
		// 配置host并设置ssl证书
		cliOpt = cliOpt.AddBroker("ssl://" + option.Host + ":" + option.Port).SetTLSConfig(cfg)
	} else {
		cliOpt = cliOpt.AddBroker(option.Host + ":" + option.Port)
	}
	// 配置客户端的用户名,密码,客户端id
	cliOpt.Username = option.Username
	cliOpt.Password = option.Password
	cliOpt.ClientID = option.ClientID
	// 如果客户端id为空,则生成客户端id,根据MQTT v3.1规范,客户端ID不得超过23个字符。
	if option.ClientID == "" {
		rand.Seed(time.Now().UnixNano())
		hs := md5.New()
		hs.Write([]byte(option.Username))
		hs.Write([]byte(option.Password))
		hs.Write([]byte(fmt.Sprint(time.Now().UnixNano())))
		hs.Write([]byte(fmt.Sprint(rand.Intn(math.MaxInt64))))
		cliOpt.ClientID = hex.EncodeToString(hs.Sum(nil))
	}
	// 协议版本
	if option.Pro > 0 {
		cliOpt.ProtocolVersion = option.Pro
	}
	// 不保存会话信息,断开重连后是一个新的会话
	cliOpt.CleanSession = option.CleanSession
	// 是否自动重连
	cliOpt.AutoReconnect = true
	// 初始化连接时或自动重连时的调用函数
	cliOpt.OnConnect = func(client mqtt.Client) {
		// 判断当前的连接类型,设置相应的处理函数
		// 如果已经订阅过了,就不再重复订阅
		subscribeMu.Lock()
		defer subscribeMu.Unlock()
		// 如果已经订阅过,直接返回
		if isSubscribed {
			logger.Info(context.Background(), "Already subscribed, skip re-subscribe on reconnect")
			return
		}
		var msgHandler mqtt.MessageHandler
		msgHandler = handler.HandleMsg
		topics := strings.Split(option.Topic, ",")
		topicMap := map[string]byte{}
		for _, t := range topics {
			topicMap[t] = option.Qos
		}
		var token mqtt.Token
		// 消息订阅
		token = client.Subscribe(
			option.Topic,
			option.Qos,
			func(client mqtt.Client, message mqtt.Message) {
				go msgHandler(client, message)
			})
		// 判断是否超时
		if !token.WaitTimeout(time.Second * 30) {
			logger.Info(context.Background(), "subscribe timeout", logger.Any("option", option))
			return
		}
		// 判断是否发生错误
		if err := token.Error(); err != nil {
			logger.Info(context.Background(), "subscribe error", logger.String("err", err.Error()), logger.Any("option", option))
			return
		}
		// 打印订阅成功日志
		logger.Info(context.Background(), "subscribe succeed", logger.Any("option", option), logger.Any("token", token))
		isSubscribed = true // 标记已订阅
	}
	// 初始化连接丢失回调
	cliOpt.OnConnectionLost = func(client mqtt.Client, e error) {
	}
	client := mqtt.NewClient(cliOpt)
	token := client.Connect()
	if !token.WaitTimeout(time.Second * 15) {
		return nil, errors.New("connect timeout")
	}
	if err := token.Error(); err != nil {
		logger.Warn(context.Background(), "connect error", logger.String("err", err.Error()), logger.Any("option", option))
		return nil, fmt.Errorf("connect error,%s", err.Error())
	}
	return client, nil
}

// newTLSConfig mqtt tls 连接相关
func newTLSConfig(f string) (*tls.Config, error) {
	certs := x509.NewCertPool()

	pemData, err := ioutil.ReadFile(f)
	if err != nil {
		return nil, err
	}
	if !certs.AppendCertsFromPEM(pemData) {
		return nil, fmt.Errorf("append certs from pem err")
	}
	return &tls.Config{
		RootCAs:            certs,
		ClientAuth:         tls.NoClientCert,
		ClientCAs:          nil,
		InsecureSkipVerify: true,
	}, nil
}

func HandleMsg(client mqtt.Client, message mqtt.Message) {
	spanCtx := logger.Start(context.Background(), "HandleMsg")
	defer logger.End(spanCtx)
	fmt.Println("收到了来自服务器的消息")
	topic, payload := DataPrepare(spanCtx, client, message)
	fmt.Println("topic:", topic, "message: ", payload, "qos:", message.Qos())
	payloadMap := make(map[string]interface{})
	json.Unmarshal(payload, &payloadMap)
	if _, ok := payloadMap["remain_times"]; ok {
	}
	logger.Info(spanCtx, "HandleMsg", logger.String("topic", topic), logger.Any("payload", payloadMap))
}

func DataPrepare(ctx context.Context, cli mqtt.Client, msg mqtt.Message) (
	topic string,
	payload []byte,
) {
	spanCtx := logger.Start(ctx, "LoraDataPrepare")
	defer logger.End(spanCtx)

	topic = msg.Topic()
	payload = msg.Payload()

	return
}

handler.go

package handler

import (
	"brewing-bigdata/internal/app/logic"
	"brewing-bigdata/internal/common"
	"brewing-bigdata/internal/schema"
	"context"
	"encoding/json"

	"git.dev.nongbotech.cn/nbi-gopkg/logger"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func HandleMsg(client mqtt.Client, message mqtt.Message) {
	spanCtx := logger.Start(context.Background(), "HandleMsg")
	defer logger.End(spanCtx)
	topic, payload := DataPrepare(spanCtx, client, message)
	payloadMap := make(map[string]interface{})
	json.Unmarshal(payload, &payloadMap)
	logger.Info(spanCtx, "来自服务器消息-HandleMsg", logger.String("topic", topic), logger.Any("payloadMap", payloadMap))
	IP := "117.173.165.135"
	var Admin common.UserInfo
	Admin.CompanyID = "1699432555659652"
	if _, ok := payloadMap["remain_times"]; ok { // 修改剩余次数
		logger.Info(spanCtx, "modify remain times")
		var Params schema.ModifyWaterDeviceRemainTimes
		// id := payloadMap["id"].(string)
		deviceSn := payloadMap["device_sn"].(string)
		name := payloadMap["name"].(string)
		remainTimes := payloadMap["remain_times"].(float64)
		Params.DeviceSn = deviceSn
		Params.Name = name
		Params.RemainTimes = int64(remainTimes)
		res, err := logic.WaterQualityConfig{}.ModifyWaterDeviceRemainTimes(Params, 1)
		logger.Info(spanCtx, "modify remain times", logger.Any("params", Params), logger.Any("res", res), logger.Any("err", err))
	} else if _, ok := payloadMap["code"]; ok { // 立即采样
		logger.Info(spanCtx, "sample now")
		id := payloadMap["id"].(string)
		deviceSn := payloadMap["device_sn"].(string)
		code := payloadMap["code"].(string)
		cate := payloadMap["cate"].(float64)
		commandTime := payloadMap["command_time"].(float64)
		commandID := payloadMap["command_id"].(string)
		req := payloadMap["req"].(string)
		var Params schema.QishuoSampleNowParams
		Params.ID = id
		Params.DeviceSn = deviceSn
		Params.Code = code
		Params.Cate = int8(cate)
		Params.CommandTime = int64(commandTime)
		Params.CommandID = commandID
		Params.Req = req
		data, err := logic.WaterQualityConfig{}.QishuoSampleNow(context.Background(), Params, Admin, IP, 1)
		logger.Info(spanCtx, "sample now", logger.Any("params", Params), logger.Any("data", data), logger.Any("err", err))
	} else { // 其它
		logger.Info(spanCtx, "other message")
	}
	logger.Info(spanCtx, "HandleMsg", logger.String("topic", topic), logger.Any("payload", payloadMap))
}

func DataPrepare(ctx context.Context, cli mqtt.Client, msg mqtt.Message) (
	topic string,
	payload []byte,
) {
	spanCtx := logger.Start(ctx, "LoraDataPrepare")
	defer logger.End(spanCtx)

	topic = msg.Topic()
	payload = msg.Payload()

	return
}

Step5: 项目初始化

mqtt.go

package config

import (
	"context"
	"path/filepath"
	"runtime"

	"git.dev.nongbotech.cn/nbi-gopkg/logger"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"github.com/spf13/viper"
)

type MqttOption struct {
	Host          string `json:"host" mapstructure:"host"` // mqtt服务器url
	Port          string `json:"port" mapstructure:"port"` // mqtt服务器端口
	HTTPHost      string `json:"http_host" mapstructure:"http_host"`
	HTTPPort      string `json:"http_port" mapstructure:"http_port"`
	Username      string `json:"username" mapstructure:"username"`
	Password      string `json:"password" mapstructure:"password"`
	ClientID      string `json:"client_id" mapstructure:"client_id"`
	CleanSession  bool   `json:"clean_session"  mapstructure:"clean_session"`
	AutoReconnect bool   `json:"auto_reconnect" mapstructure:"auto_reconnect"`
	Topic         string `json:"topic" mapstructure:"topic"`
	Pro           uint   `json:"pro" mapstructure:"pro"`
	Qos           byte   `json:"qos" mapstructure:"qos"`
	Handler       string `json:"handler" mapstructure:"handler"` // lora | zeta
	TLS           string `json:"tls" mapstructure:"tls"`
}

var MQTTClientHashMap map[string]mqtt.Client = make(map[string]mqtt.Client)

var Conf struct {
	// IsOpenMqtt      int64        `mapstructure:"is_open_mqtt"`      // 是否开启mqtt服务,0关闭 1开启
	IsOpenMqttPub   int64        `mapstructure:"is_open_mqtt_pub"`  // 是否开启mqtt_pub服务,0关闭 1开启
	IsOpenMqttSub   int64        `mapstructure:"is_open_mqtt_sub"`  // 是否开启mqtt_sub服务,0关闭 1开启
	EnableMQTTCache bool         `mapstructure:"enable_mqtt_cache"` // 是否启用mqtt消息缓存功能
	MqttOps         []MqttOption `mapstructure:"mqtt_sub"`          // mqtt-brokers订阅
	Distribution    MqttOption   `mapstructure:"mqtt_pub"`          // 发布订阅
}

// ParseConf 配置解析
func ParseConf() {
	// 获取当前路径,主要用于单元测试
	_, Dir, _, _ := runtime.Caller(0)
	currentDir := filepath.Dir(Dir)
	// viper 解析配置
	vp := viper.New()
	vp.SetConfigType("yaml")
	vp.SetConfigName("app")
	vp.AddConfigPath("./")
	vp.AddConfigPath("./etc")
	vp.AddConfigPath(currentDir + "/../etc")
	err := vp.ReadInConfig()
	if err != nil {
		panic("parse conf error")
	}
	vp.Unmarshal(&Conf)
	logger.Info(context.Background(), "parse conf success", logger.Any("Conf", Conf))
}

boot.go

package boot

import (
	"brewing-bigdata/internal/app/pubmqtt"
	"brewing-bigdata/internal/app/subscribe"
	"context"

	mqttConf "brewing-bigdata/config"

	"git.dev.nongbotech.cn/nbi-gopkg/logger"
)
// 项目运行的入口处,调用这个函数即可初始化mqtt服务
func APPInit() {
	// IsOpenMqtt := mqttConf.Conf.IsOpenMqtt       // 是否开启mqtt服务
	IsOpenMqttPub := mqttConf.Conf.IsOpenMqttPub // 是否开启mqtt_pub服务
	IsOpenMqttSub := mqttConf.Conf.IsOpenMqttSub // 是否开启mqtt_sub服务
	if IsOpenMqttPub == 1 {                      // 发布初始化
		logger.Info(context.Background(), "mqttConf.Conf.Pub", logger.Any("pubconf: ", mqttConf.Conf.Distribution))
		pubmqtt.MQttInit(mqttConf.Conf.Distribution) // 发布
	}
	if IsOpenMqttSub == 1 { // 订阅初始化
		logger.Info(context.Background(), "mqttConf.Conf.Sub", logger.Any("subconf: ", mqttConf.Conf.MqttOps))
		subscribe.MqttInit(mqttConf.Conf.MqttOps) // 订阅
	}
	if IsOpenMqttPub == 0 && IsOpenMqttSub == 0 {
		logger.Info(context.Background(), "mqttConf is not nedd open", logger.Any("pubconf: ", mqttConf.Conf.IsOpenMqttPub), logger.Any("subconf: ", mqttConf.Conf.IsOpenMqttSub))
	}
}

此时就搭建好了mqtt服务了,服务端调用pubmqtt.Pubdata()即可向指定的设备发送消息了