各位朋友大家好,本人为重庆工商大学物联网工程大二在校学生,欢迎各位与我交流。同时,我们在学校创立了一个创意科技团队,CO团队,平时做一些物联网相关的小创意项目。希望大家了解一下我们,也欢迎各位与我联系。我的邮箱是
chnhawk@foxmail.com
前言:本篇主要记录一下我个人使用mqtt协议上OneNet云时踩过的一些坑。之前使用过mosquito在自己的服务器上搭建的mqtt转发服务,但是和OneNet的使用还是有一些不一样的地方。前面记录的大都是开发的过程和一些小心得,后面会附上自己的测试代码。对于大佬来说可能这算不上什么干货,但是对于高校学生或者初学者来讲,还没有较多的开发经验,故作此篇文章,记录一下上云的心得。本文存在有错误以及不正规的地方,还恳请各位批评斧正!
一、Android设备使用MQTT协议接入OneNet平台
1. 用到的MQTT库:eclipse.paho
直接在app的gradle内dependencies添加依赖
compile 'org.eclipse.paho
rg.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho
rg.eclipse.paho.android.service:1.1.1'
注意:使用eclipse.paho一定记得要在AndroidManifest.xml中添加服务
<serviceandroid:name="org.eclipse.paho.android.service.MqttService"/>
否则没有办法正常使用
2. 连接上OneNet的服务器
需要设置
clientId为deviceId,即创建设备时的设备号
userName为productId,即创建产品的产品号
password为APIkey或者authInfo(即自己设置鉴权信息字符串)
因为APIkey太长不好记忆,所以我选择鉴权
1) 首先配置连接设置
MqttConnectOptions option = newMqttConnectOptions();
option.setUserName(productId);
option.setPassword(authInfo.toCharArray());
option.setCleanSession(false);
2) 调用mqttclient的连接方法
MqttAndroidClient mqttClient = newMqttAndroidClient(mContext, ("tcp://" + serverIP + ":" +serverPort), deviceId);
mqttClient.connet(option);
//OneNet的ip为183.230.40.39
//mqtt端口为6002
这样基本上就连接上OneNet的服务器了,ConnAck包这些是paho库帮我们完成的接收确认,我们只需要调用连接的这个函数即可,非常方便。
ps.当初看开发文档的时候一开始没有看到设置用户名这些,因为照着接入流程做,看到了服务器的ip和端口就直接跑去连接了,发现一直连接不上。直到再次看了一遍开发文档,拖到文档下面的常见问题才发现还需要设置这些。感觉这些重要的设置内容应该写在前面比较好,第一次没注意看,很尴尬。
3.发布及订阅自己的topic
这个很简单,之前用自己的服务器也是这样做的,
1) 订阅:
mqttClient.subscribe("topic",0); //参数一为topic字符串, 参数二为QoS级别
2) 发布:
MqttMessage msg = new MqttMessage();
msg.setPayload(payload.getBytes()); //发布内容
msg.setQos(1); //设置发布级别
mqttClient.publish(topic, msg); //发布
我们的项目是以topic来区别设备的
譬如,Android应用就订阅一个topic为Android/id,id为编号
底层设备就订阅一个topic为Device/id
然后发送指令直接向对应的topic发送就行
ps.一开始接入OneNet我也就是这么做的,使用的自己的topic来通信,完全把OneNet平台当成一个broker(转发器)了,到后面才发现我们简直是用高射炮打了蚊子,根本没有使用正确的姿势来使用OneNet平台。
4.使用mqtt上传数据点/获取数据
OneNet为每一个产品提供了数据流,在平台上记录并且可以生成数据的曲线,要想让我们通过mqtt协议上传的数据能够保存在数据流上,就得向$dp这个topic发送一串规定格式的数据。
我翻阅了一下OneNet的mqtt协议手册,没能搞清楚,翻车了。试了几次向$dp这个topic发送数据,在设备数据界面看都没有成功更新到数据,最让人疑惑的是,一旦发送了数据,设备就会掉线,需要重连。不太清楚OneNet平台是不是有什么发布数据就让你强制下线的机制。并且因为平台不允许订阅$打头的数据,所以也不清楚数据成功发送出去了没有。
起初我以为是OneNet服务器的问题,不过其他的topic还是可以正常的使用,并且换用平台提供的官方客户端,又可以成功发送且对数据进行更新。
1.png (0 Bytes, 下载次数: 0)
下载附件
OneNet提供的mqtt测试客户端
昨天 21:51 上传
(使用OneNet提供的mqtt测试程序上传数据点,同时可以看到平台上显示设备的数据更新了)
1.png (0 Bytes, 下载次数: 0)
下载附件
数据点显示更新
昨天 21:52 上传
于是我在想是不是发送的格式还是不正确,就发现了这个
1.png (0 Bytes, 下载次数: 0)
下载附件
测试客户端的调试窗口显示刚刚发布的内容
昨天 21:53 上传
这个是官方软件的调试输出窗口,可以看到刚刚发布出去的payload为上图所示的一串16进制数据。于是我尝试将这段数据复制到mqtt.fx软件并进行发布。
ps.mqttfx是一款非常方便的测试mqtt的软件。不过在win7系统下经常容易出现窗口消失的bug,实质上是窗口跑到屏幕外面去了。可以通过在状态栏-右键-移动(M)-使用方向键让窗口回到屏幕内。
2.png (0 Bytes, 下载次数: 0)
下载附件
移动mqtt.fx窗口
昨天 21:55 上传
使用mqtt.fx进行测试
3.png (0 Bytes, 下载次数: 0)
下载附件
通过mqtt.fx发布消息
昨天 21:55 上传
经过测试,很遗憾还是没有成功,这让我有些郁闷。
于是决定还是通过wireshark软件对OneNet平台提供的客户端进行抓包,看看他究竟发送的是什么。
1.png (0 Bytes, 下载次数: 0)
下载附件
OneNet平台提供的客户端发送的数据
昨天 21:57 上传
可以看到确实是向$dp这个topic发送了消息,7b是”{”转为16进制,那么前面的01 00 5e就是协议文档里面提到的发送类型、长度高位、长度低位了。
我之前发送的也确实是按照这个协议走的啊,为什么不成功呢?
于是再来抓一下我之前程序发送的内容(这里丢人了,捂脸)
2.png (0 Bytes, 下载次数: 0)
下载附件
我自己发送的数据
昨天 21:57 上传
附上我之前的代码:
3.png (0 Bytes, 下载次数: 0)
下载附件
错误的代码示范
昨天 21:57 上传
这样问题就显而易见了。
协议中的格式讲的是发送16进制的01 00 length,而通过我之前编写的这段代码发送出去的是字符串”01 00length”,同理,之前测试通过mqttfx发送出去的就更不对了。
故赶紧修改代码为:
4.png (0 Bytes, 下载次数: 0)
下载附件
修改之后的代码
昨天 21:57 上传
这样就能成功地将数据库上传到OneNet平台了。
总结一下,在这一步翻车的原因还是对mqtt协议理解的不够充分,错把16进制字符串当做16进制在发送。做物联网项目,会经常在16进制数组和16进制字符串之间来回切换,一定要弄清楚,不然像这样因为这样一个小问题而耗费大量时间去处理、测试就很不划算了。
下面附上自己编写的基于paho的mqtt使用帮助类:
- import android.content.Context;
- import android.util.Log;
- import org.eclipse.paho.android.service.MqttAndroidClient;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import java.util.ArrayList;
- /**
- * Created by CHNhawk on 2017/3/7
- * 重庆工商大学 物联网工程 熊廷宇
- * update on 2017/5/16
- * 添加了消息队列转发以及MsgHandler接口
- * 支持更好地用于多Activity
- *
- */
- public class MqttUtil {
- //region 网络设定
- private static String serverIP = "183.230.40.39"; // 服务器ip
- private static int serverPort = 6002; // 服务器端口
- private static String deviceId = "填设备id"; // 终端id
- private static String userId = "填应用id"; // 服务器登陆名
- private static String password = "填鉴权或apiKey"; // 服务器登陆密码
- //endregion
- //region 消息类型
- public static final int MQTT_CONNECTED = 1000; // mqtt连接成功
- public static final int MQTT_CONNECTFAIL = 1001; // mqtt连接失败
- public static final int MQTT_DISCONNECT = 1002; // mqtt断开连接
- public static final int MQTT_SUBSCRIBED = 1010; // 订阅成功
- public static final int MQTT_SUBSCRIBEFAIL = 1011; // 订阅失败
- public static final int MQTT_MSG_TEST = 2001; // 接收到TEST消息
- public static final int MQTT_PUBLISHED = 2010; // 发布成功
- public static final int MQTT_PUBLISHFAIL = 2011; // 发布失败
- //endregion
- //region MQTT客户端服务
- private static MqttUtil instance; // 单例对象
- private MqttAndroidClient mqttClient; // mqtt客户端
- private MqttConnectOptions option; // mqtt设置
- private MqttCallback clientCallback; // 客户端回调
- private Context mContext; // 上下文
- private ArrayList<MsgHandler> listenerList = new ArrayList<>(); // 消息接收者
- //endregion
- //封闭构造函数
- private MqttUtil(){}
- //获取单例
- public static MqttUtil getInstance(){
- if(instance == null){
- instance = new MqttUtil();
- //deviceId = android.os.Build.SERIAL + (int)(Math.random() * 10);
- }
- return instance;
- }
- //初始化连接
- public void initMqtt(Context context) {
- mContext = context;
- connect();
- }
- //重连
- public void reConnect() {
- //尝试重连
- connect();
- }
- //发布消息
- public void publish(String topic, String payload) {
- MqttMessage msg = new MqttMessage();
- msg.setPayload(payload.getBytes());
- msg.setQos(1);
- if (mqttClient != null){
- if(mqttClient.isConnected()) {
- try {
- mqttClient.publish(topic, msg);
- DispachEvent(MQTT_PUBLISHED);
- } catch (MqttException e) {
- //发布失败
- Log.e("mqtt", "发布失败" + e);
- DispachEvent(MQTT_PUBLISHFAIL);
- }
- } else {
- Log.e("mqtt", "网络未连接 - 尝试重新连接" );
- connect();
- DispachEvent(MQTT_PUBLISHFAIL);
- }
- } else {
- Log.e("mqtt", "客户端初始化失败" );
- DispachEvent(MQTT_PUBLISHFAIL);
- }
- }
- //订阅主题
- public void subscribe(String topic){
- try {
- //订阅消息
- mqttClient.subscribe(topic, 0);
- DispachEvent(MQTT_SUBSCRIBED);
- } catch (MqttException e) {
- DispachEvent(MQTT_SUBSCRIBEFAIL);
- Log.e("mqtt", "订阅错误:" + e);
- }
- }
- //返回是否连接
- public boolean isConnected(){
- return mqttClient.isConnected();
- }
- //获取本机deviceId
- public String getDeviceId(){
- return deviceId;
- }
- //连接
- private void connect() {
- if (mqttClient == null) {
- option = new MqttConnectOptions();
- option.setUserName(userId);
- option.setPassword(password.toCharArray());
- option.setCleanSession(false);
- //设置回调
- clientCallback = new MqttCallback() {
- @Override
- public void connectionLost(Throwable cause) {
- //断开连接
- DispachEvent(MQTT_DISCONNECT);
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- //接收到消息
- Log.v("mqtt", "接收到信息:" + topic);
- DispachMessage(topic, message);
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- //publish成功后调用
- Log.v("mqtt","发送成功");
- }
- };
- try {
- mqttClient = new MqttAndroidClient(mContext, ("tcp://" + serverIP + ":" + serverPort), deviceId);
- mqttClient.setCallback(clientCallback);
- } catch (Exception e) {
- Log.e("mqtt", "启动服务错误:" + e);
- }
- }
- if (!mqttClient.isConnected()) {
- //匿名连接线程
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- int count = 0; //重试次数
- while(count < 5 && !mqttClient.isConnected()) {
- mqttClient.connect(option);
- Thread.sleep(1000);
- count ++;
- }
- //连接成功
- if(mqttClient.isConnected()) {
- DispachEvent(MQTT_CONNECTED);
- Log.v("mqtt", "连接成功");
- } else {
- Log.e("mqtt", "连接网络错误");
- DispachEvent(MQTT_CONNECTFAIL);
- }
- } catch (Exception e) {
- Log.e("mqtt", "连接错误:" + e);
- DispachEvent(MQTT_CONNECTFAIL);
- }
- }
- }).start();
- }
- }
- //region 消息转发部分
- //添加接收者
- public void addListener(MsgHandler msgHandler){
- if(!listenerList.contains(msgHandler)) {
- listenerList.add(msgHandler);
- }
- }
- //移除接收者
- public void removeListener(MsgHandler msgHandler){
- listenerList.remove(msgHandler);
- }
- //移除所有接收者
- public void removeAll(){
- listenerList.clear();
- }
- //发送消息
- public void DispachMessage(String type, Object data){
- if(listenerList.isEmpty()) {
- Log.v("mqtt", "没有消息接收者:" + type);
- return;
- }
- Log.v("mqtt", "发送消息:" + type);
- for (MsgHandler msgHandler : listenerList)
- {
- msgHandler.onMessage(type, data);
- }
- }
- //发送事件
- public void DispachEvent(int event){
- if(listenerList.isEmpty()) {
- Log.v("mqtt", "没有消息接收者:");
- return;
- }
- Log.v("mqtt", "派发事件:" + event);
- for (MsgHandler msgHandler : listenerList)
- {
- msgHandler.onEvent(event);
- }
- }
- //endregion
- }
复制代码
消息接口如下
- /**
- * Created by CHNhawk on 2017/5/16.
- * 重庆工商大学 物联网工程 熊廷宇
- * 自定义的消息接口
- */
- public interface MsgHandler
- {
- /**
- * 消息
- * @param type 消息类型
- * @param data 数据
- */
- void onMessage(String type, Object data);
- /**
- * 事件
- * @param event x
- */
- void onEvent(int event);
- }
复制代码
这样就可以很方便的使用mqtt通过Android设备连接上OneNet服务器了。
二、C#使用MQTT协议接入OneNet平台
讲完了Android,再来简单介绍一下其他平台或者应用接入OneNet平台的方式吧。
网上虽然有大量关于mqtt的文章,不过我再复述一遍,增加一下自己的印象也是极好的。
这里说一句,C#用来编写.NET,同时也可以用在Unity引擎中,再导出到各个平台,C#也可以用来写Android程序,哈哈,也是非常方便的!
这里附上一张我使用Unity+mqtt实现的具有房间聊天功能的安卓对战小游戏,游戏可以同步玩家数据,实现登陆、同一房间聊天的功能。为了练习,网络通信部分我完全使用的mqtt协议,测试联机的效果还不错。
1.png (0 Bytes, 下载次数: 0)
下载附件
自制的mqtt小游戏mcio
昨天 22:02 上传
我们知道untiy用于AR以及VR项目也是很火的,而物联网+AR/VR这个组合也是很火爆,下面我就介绍一下unity怎么使用C#中的mqtt连接上OneNet平台。纯C#的.NET应用同理~
首先需要的mqtt库,M2Mqtt
这个网上可以下载到源码,在unity中我们需要把它导出为dll文件来调用。
注意,导出的时候记得选择3.5版的框架,否则Unity不兼容。
然后将导出的dll文件移动到项目的\Assets\Plugins文件夹下,注意这里只能放在这个文件夹下,不然没有用。
然后我们就可以愉快地使用m2mqtt了。
1. 导入包
using uPLibrary.Networking.M2Mqtt;
usinguPLibrary.Networking.M2Mqtt.Messages;
2. 创建mqtt客户端对象并且进行连接
MqttClient mqttClient;
//设置ip、端口
mqttClient = new MqttClient(“183.230.40.39”,6002, false, null);
mqttClient. Connect(clientId, username,password);
//订阅主题
mqttClient.Subscribe(new string[] {topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
//取消订阅
mqttClient.Unsubscribe(new string[] {topic });
//发布信息
mqttClient.Publish(topic,Encoding.UTF8.GetBytes(payload));
下面附上自己编写的辅助类:
- using UnityEngine;
- using System.Collections;
- using System.Net;
- using System.Text;
- using uPLibrary.Networking.M2Mqtt;
- using uPLibrary.Networking.M2Mqtt.Messages;
- using System;
- // mqtt网络通讯模块
- public class MqttNetwork
- {
- private static MqttNetwork instance; //单例的网络连接对象
- private MqttClient mqttClient; //mqtt客户端
- private EventManager manager; //事件管理
- private MqttNetwork()
- {
- //构造函数
- manager = EventManager.GetInstance();
- //initNetWork();
- }
- public static MqttNetwork GetInstance()
- {
- //获取实例
- if (instance == null)
- {
- instance = new MqttNetwork();
- }
- return instance;
- }
- public void initNetWork()
- {
- mqttClient = new MqttClient(“183.230.40.39”, 6002, false, null);
- //注册服务器返回信息接受函数
- mqttClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
- //注册服务器断开函数
- mqttClient.ConnectionClosed += client_ConnectionClosed;
- }
- public void subsribeTopics(string topic)
- { //订阅主题
- mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
- }
- public void unSubsribeTopics(string topic)
- { //取消订阅主题
- if (mqttClient != null)
- {
- mqttClient.Unsubscribe(new string[] { topic });
- }
- }
- public void publish(string topic, string payload)
- {//发布消息
- if (mqttClient != null)
- {
- mqttClient.Publish(topic, Encoding.UTF8.GetBytes(payload));
- }
- }
- public void connect()
- {
- if (mqttClient == null)
- {
- //初始化mqtt客户端
- initNetWork();
- }
- if (!mqttClient.IsConnected)
- {
- //若未连接则进行连接
- try
- {
- mqttClient.Connect(“clientId”);//客户端ID
- if (mqttClient.IsConnected)
- {
- //订阅需要的主题
- subsribeTopics("test")
- }
- }
- catch (System.Exception e)
- {
- Debug.Log("连接出错 : " + e);
- manager.DispachEvent("mqtt-disconnected", null);
- }
- }
- }
- public void disConnect()
- {
- //关闭连接
- if (mqttClient != null && mqttClient.IsConnected)
- {
- mqttClient.Disconnect();
- }
- }
- public bool isNetConnected()
- {
- if (mqttClient == null)
- {
- return false;
- }
- return mqttClient.IsConnected;
- }
- void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs package)
- {
- //过滤并处理接收到的消息
- string topic = package.Topic;
- string msg = System.Text.Encoding.UTF8.GetString(package.Message);
- //To do sth.
- }
- private void client_ConnectionClosed(object sender, EventArgs e)
- {
- Debug.Log("[系统] : 服务器已断开");
- manager.DispachEvent("mqtt-disconnected", null);
- }
- }
复制代码
用到的一个事件转发接口
- public interface EventHandler
- {
- /// <summary>
- ///
- /// </summary>
- /// <param name="type"></param>
- /// <param name="data"></param>
- void OnEvent(string type, object data);
- }
复制代码
事件转发的实现
- using UnityEngine;
- using System.Collections.Generic;
- //事件管理类
- public class EventManager
- {
- private static EventManager instance; // 同样使用单例模式
- private Dictionary<string, List<EventHandler>> dicHandler; //事件句柄集合
- private EventManager()
- {
- //内部的构造函数,不对外开放
- dicHandler = new Dictionary<string, List<EventHandler>>();
- }
- public static EventManager GetInstance()
- {
- if (instance == null)
- {
- instance = new EventManager();
- }
- return instance;
- }
- /// <summary>
- /// 注册事件监听
- /// </summary>
- /// <param name="type">监听类型</param>
- /// <param name="listher">监听对象</param>
- public void AddEventListener(string type, EventHandler listher)
- {
- if (!dicHandler.ContainsKey(type))
- {
- dicHandler.Add(type, new List<EventHandler>());
- }
- dicHandler[type].Add(listher);
- }
- /// <summary>
- /// 移除对type的所有监听
- /// </summary>
- /// <param name="type"></param>
- public void RemoveEventListener(string type)
- {
- if (dicHandler.ContainsKey(type))
- {
- dicHandler.Remove(type);
- }
- }
- /// <summary>
- /// 移除监听者的所有监听
- /// </summary>
- /// <param name="listener">监听者</param>
- public void RemoveEventListener(EventHandler listener)
- {
- foreach (var item in dicHandler)
- {
- if (item.Value.Contains(listener))
- {
- item.Value.Remove(listener);
- }
- }
- }
- /// <summary>
- /// 清空所有监听事件
- /// </summary>
- public void ClearEventListener()
- {
- Debug.Log("清空对所有所有所有事件的监听");
- if (dicHandler != null)
- {
- dicHandler.Clear();
- }
- }
- /// <summary>
- /// 派发事件
- /// </summary>
- /// <param name="type">事件类型</param>
- /// <param name="data">事件传达的数据</param>
- public void DispachEvent(string type, object data)
- {
- if (!dicHandler.ContainsKey(type))
- {
- Debug.Log("未添加任何" + type + " 类型的监听器");
- return;
- }
- //Debug.Log("派发事件:" + type);
- List<EventHandler> list = dicHandler[type];
- for (int i = 0; i < list.Count; i++)
- {
- list.OnEvent(type, data);
- }
- }
- }
复制代码
这样,在自己的接收类实现OnEvent方法就行
三、NodeJS使用MQTT协议接入OneNet平台
NodeJs是近年比较火热的一个服务器js环境,可以在Linux系统上运行,配合mqtt实现一些功能,效率挺不错的。这里提供两个mqtt+nodejs的思路:
其一,用于树莓派等基于Linux系统的物联网设备连接至OneNet平台。
其二,用于自己的服务器配置mqtt转发服务以及一些自动处理服务,这个和OneNet平台就没有关系了,不过可以用在物联网相关项目上,让服务器自动完成一些数据的处理,我在这里也简单提一下。
首先讲其一:
树莓派一直是硬件爱好者比较喜欢的玩具设备(雾),也可以用树莓派来做一些物联网相关的东西,那么运行Linux系统的树莓派使用nodejs来连接上OneNet平台也是很不错的。不久前我尝试在树莓派上通过Python获取传感器数据并保存在本地文件中,然后再通过NodeJs上传到服务器,这样就可以在手机app上同步获取到传感器数据了。
废话不多说,首先下载配置nodejs+npm环境,这一块我不过多叙述,网上有太多太多的教程和步骤详细截图,本部分重点讲mqtt模块。
一般来说使用
apt-get install nodejs
apt-get install npm
这两个命令就可以安装,安装完毕可以输入node -v查看版本。
无法安装的可以先对apt update/upgrade一下,实在不行就或者直接wget下载nodejs解压。
要运行mqtt模块,nodejs版本不能太低,版本低的话可以安装n模块来升级nodejs,下面的内容假定你已经配置好了合适的nodejs环境。
nodejs中提供mqtt服务的有很多模块,我们就用mqtt这个模块,简单粗暴。
使用npm install mqtt命令安装mqtt模块
安装完成后,便可以实现一个mqtt的客户端连接至服务器的脚本了。
附上我自己编写的一个测试脚本:
- /// nodejs上传数据测试脚本
- /// 使用MQTT协议
- /// author CHNhawk (重庆工商大学 15物联网工程 熊廷宇)
- var mqtt = require('mqtt'); // 导入mqttCilent模块
- var ip = '183.230.40.39'; // oneNet服务器ip
- var port = '6002';
- var deviceId = '4734262'; // 设备号
- var userName = '81695'; // 用户名-产品id
- var passWord = '12345678'; // 密码-鉴权
- // 创建连接
- var client = mqtt.connect('mqtt://' + ip + ':' + port, {
- username: userName,
- password: passWord,
- clientId: deviceId
- });
- var isConnected = false; // 是否连上服务器的标识
- console.log('Connecting to ' ,ip + ':' + port + '...');
- client.on('connect', function() {
- // 连接成功回调函数
- console.log('Server is connected to', ip + ':' + port);
- isConnected = true;
- // 订阅主题 - 指令接收topic
- client.subscribe('ctbu_cmd/' + deviceId);
- console.log('Subscribe', 'ctbu_cmd/' + deviceId);
- });
- client.on('message', function(topic, message) {
- // 接收信息回调函数
- console.log(topic.toString() + ' - ' + message.toString());
- var payloadMsg = message.toString();
- switch (topic) {
- case 'ctbu_cmd/' + deviceId:
- console.log('Received command: ', payloadMsg);
- /////////////
- //以下处理命令
- /////////////
- break;
- }
- });
- client.on('offline', function(error){
- // 掉线回调函数
- isConnected = false;
- console.log('Connecting failed cz:', error);
- });
- client.on('error', function(){
- // 连接错误回调函数
- isConnected = false;
- console.log('Lost connecting :', ip + ':' + port);
- });
- setInterval(function() {
- // 设置定时上传数据
- if(isConnected){
- var time = getTime();
- var msg = "value" + time;
- var payload = buf.join("");
- // 向topic发布数据
- client.publish('ctbu_data', payload);
- } else {
- console.log('Offline state!');
- }
- }, 5000);
- //获取当前时间
- function getTime() {
- return new Date().toLocaleString();
- }
复制代码
其二,在Linux服务器上搭建一个mqtt转发器
那么假定你已经配置好了nodejs。
使用npm install mosca 命令安装mosca模块
mosca是一个能够创建mqtt服务器的一个模块,使用起来也很方便。
我在之前提到的小游戏就是用这个模块加上mongodb数据库搭建的服务器,比较简单,效果也还好,700多行代码就搞定了。对于我那种高并发状态的游戏十几个人一起玩玩还是妥妥的,没做过压力测试,我的服务器的买的阿里云的学生专用版,9.9一个月那种,不敢奢求太多。不过用于物联网项目,还是够用。
也附上我自己编写的一个简单的测试脚本
- /// nodejs服务器测试脚本
- /// 使用MQTT协议
- /// author CHNhawk (重庆工商大学 15物联网工程 熊廷宇)
- var mosca = require('mosca'); // 导入mosca模块
- // 创建mqtt服务器对象并设置开放端口
- var MqttServer = new mosca.Server({
- port: 1883
- });
- MqttServer.on('clientConnected', function(client){
- // 设备连接回调函数
- console.log('client connected', client.id);
- });
- MqttServer.on('published', function(packet, client) {
- // 发布topic回调函数
- var topic = packet.topic;
- console.log('message-arrived--->','topic :'+topic+',message :'+ packet.payload.toString());
- });
- MqttServer.on('ready', function(){
- // 成功运行服务
- console.log('mqtt is running...');
- });
复制代码
后记:这篇文章是参加实训需要完成的任务之一,之前很少在网上发布类似的帖子,虽然平时倒是经常写点文档做些记录,不过都是写给自己看的。由于本人技术水平及文字功底有限,如果大家有什么没有看明白或者质疑的地方,非常欢迎各位给我拍砖。希望与大家共同进步,最后祝OneNet平台越办越好!
在网上编辑帖子感觉比写文档还要麻烦一点,还不是很习惯....