使用python2.7测试通过,实现消息订阅,发布,平台数据下发,存储功能。
一定要修改下面代码
clientid = '8760549' # 创建设备时得到的设备ID,为数字字串
username = '79581' # 注册产品时,平台分配的产品ID,为数字字串
password = 'iDO9cRe85v2wpgFm2hQsMiapP=U=' # 为设备的鉴权信息(即唯一设备编号,SN),或者为apiKey,为字符串
安装 sudo pip install paho-mqtt
关于paho的使用参考:https://pypi.python.org/pypi/paho-mqtt/1.3.0
实现功能:
平台下发的数据,会转发给订阅了test主题的设备。收到主题为upload后,会将数据类型3:Json数据2的固定数据{"a":1,"c":3,"b":2,"d":4}发给平台。
树莓派上也同样适用。
不知道有没有EDP的python库可以用,没有的话,我打算自己弄一个,后面在树莓派上用。||ヽ(* ̄▽ ̄*)ノミ|Ю
原帖链接:https://open.iot.10086.cn/bbs/thread-16116-1-1.html
- #!/usr/bin/python
- # -*- coding:utf-8 -*-
- import paho.mqtt.client as mqtt
- try:
- import paho.mqtt.publish as publish
- except ImportError:
- # This part is only required to run the example from within the examples
- # directory when the module itself is not installed.
- #
- # If you have the module installed, just use "import paho.mqtt.publish"
- import os
- import inspect
- cmd_subfolder = os.path.realpath(
- os.path.abspath(os.path.join(os.path.split(inspect.getfile(inspect.currentframe()))[0], "../src")))
- if cmd_subfolder not in sys.path:
- sys.path.insert(0, cmd_subfolder)
- import paho.mqtt.publish as publish
- hostname = '183.230.40.39' # OneNET服务器地址
- port = 6002 # OneNET服务器端口地址
- clientid = '8760549' # 创建设备时得到的设备ID,为数字字串
- username = '79581' # 注册产品时,平台分配的产品ID,为数字字串
- password = 'iDO9cRe85v2wpgFm2hQsMiapP=U=' # 为设备的鉴权信息(即唯一设备编号,SN),或者为apiKey,为字符串
- topic1 = '$creq/#' #订阅平台下发命令主题
- topic2 = 'upload'
- # 连接成功回调函数
- def on_connect(client, userdata, flags, rc):
- print('Connected with result code ' + str(rc))
- # Subscribing in on_connect() means that if we lose the connection and
- # reconnect then subscriptions will be renewed.
- # 连接完成之后订阅gpio主题
- client.subscribe(topic1, 0) # 参数一为topic字符串, 参数二为QoS级别,默认QoS=0
- client.subscribe(topic2)
- # 消息推送回调函数
- def on_message(client, userdata, msg):
- print('[topic:]'+msg.topic + ' ' + '[payload:]'+str(msg.payload))
- if msg.topic == 'upload': # 如果收到upload主题,就将数据类型3(json数据2)内容上传给OneNET平台
- dp_load = '{"a":1,"c":3,"b":2,"d":4}'
- dp_type = 3
- dp_len = len(dp_load)
- dp=bytearray()
- dp.append(dp_type)
- dp.append((dp_len >> 8) & 0xFF)
- dp.append(dp_len & 0xFF)
- dp = dp + dp_load
- print repr(dp)
- (rc, final_mid) = client.publish('$dp', str(dp), 2, True)
- if '$creq' in msg.topic: # 如果收到OneNET下发的主题,然后将信息转发给test主题,其他设备将会收到
- (rc, final_mid) = client.publish('test', str(msg.payload), 2, True)
- def on_publish(mqttc, obj, mid):
- print("Publish mid: " + str(mid))
- pass
- def on_subscribe(mqttc, obj, mid, granted_qos):
- print("Subscribed: " + str(mid) + " " + str(granted_qos))
- def on_log(mqttc, obj, level, string):
- print("Log:" + string)
- if __name__ == '__main__':
- client = mqtt.Client(client_id=clientid, clean_session=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp")
- client.on_connect = on_connect
- client.on_message = on_message
- client.on_publish = on_publish
- client.on_subscribe = on_subscribe
- client.on_log = on_log
- try:
- # 配置用户名和密码
- client.username_pw_set(username, password)
- # 请根据实际情况改变MQTT代理服务器的IP地址
- client.connect(hostname, port, 60)
- # Blocking call that processes network traffic, dispatches callbacks and
- # handles reconnecting.
- # Other loop*() functions are available that give a threaded interface and a
- # manual interface.
- client.loop_forever()
- except KeyboardInterrupt:
- client.disconnect()
|