|
本帖最后由 chengts95 于 2015-2-27 16:09 编辑
1.MQTTMQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)技术。过去需要不断轮询才能得到即时数据。如今却只需要订阅一个topic,这正是MQTT能够实现的。
2.Mosquitto
Mosquitto,一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件(server),提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化。并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。官网地址:http://mosquitto.org/。
MQTT完全文档 Building Smarter Planet Solutions with MQTT and IBM WebSphere MQ Telemetry下载地址:
http://www.redbooks.ibm.com/abstracts/sg248054.html
内有arduino作为mqtt client的具体案例。
test.mosquitto.org:是eclipse为了支持mqtt发展而设立的公共mqtt服务器,访问十分方便,对设计物联网应用十分有帮助。
3.paho
Paho 项目旨在提供可伸缩的开放和标准的 Machine-to-Machine (M2M) 以及物联网消息协议的开源实现。Paho 提供 MQTT 发布/订阅实现,相当于是mqtt客户端,官方网站给出了C++、java、android、python以及通用的C语言客户端和源代码,下面只介绍python客户端的使用。
官方网站:http://www.eclipse.org/paho/
安装paho-mqtt:
使用pip工具安装paho-mqtt,输入以下指令即可: pip install paho-mqtt
4.血压计设计
要使病人一旦启动血压测量,数据由主机的mqtt-paho客户端发布至test.mosquitto.org服务器。只要是订阅了相应病人血压主题的医生,都可以及时掌握病人的血压变化情况。例如,在自己的android手机上像qq、微信消息那样及时显示出病人当前的血压情况,甚至可以实现远程诊断。
我用华科电子的HKB-08B做示例,本血压计模块支持usb串口(cp2102),可以直接连接在树莓派上。我将HKB-08B的通讯协议写成了python的模块,使其方便调用。然后写了测试用的python demo放在树莓派上运行。代码如下:
bpr.py:
[mw_shl_code=python,true]
# -*- coding: utf-8 -*-
import serial
import time
import threading
SLEEP_Cmd = [0xff,0xcd,0x03,0xae,0xab]
WAKEUP_Cmd = [0xff,0xcd,0x03,0xad,0xaa]
START_Cmd = [0xff,0xcd,0x03,0xa3,0xa0]
STOP_Cmd = [0xff,0xcd,0x03,0xa6,0xa3]
START_STATE=0
STOP_STATE=1
SLEEP_STATE=2
WAKEUP_STATE=3
STATUS_NORMAL_INFO="設備已經連接!"
STATUS_ERROR_INFO_0="報告: 測量不到有效的脈搏!"
STATUS_ERROR_INFO_1="報告: 氣袋沒有綁好!"
STATUS_ERROR_INFO_2="報告: 測量結果數值有誤!"
STATUS_ERROR_INFO_3="報告: 進入超壓保護!"
STATUS_ERROR_INFO_4="報告: 幹預過多!"
class bpsensor:
def __init__(self,port):
self.ser=serial.Serial(
port=None, # number of device, numbering starts at
# zero. if everything fails, the user
# can specify a device string, note
# that this isn't portable anymore
# if no port is specified an unconfigured
# an closed serial port object is created
baudrate=115200, # baud rate
bytesize=8, # number of databits
parity=serial.PARITY_NONE, # enable parity checking
stopbits=serial.STOPBITS_ONE, # number of stopbits
timeout=3, # set a timeout value, None for waiting forever
xonxoff=0, # enable software flow control
rtscts=0, # enable RTS/CTS flow control
interCharTimeout=None # Inter-character timeout, None to disable
)
self.ser.port=port
self.RecBuff=[0 for i in range(10)]
self.RecIndex = 0
self.data_len = 0
self.strInfo=""
self.status=1
self.done=False
def __del__(self):
self.close()
def getdata(self,ch):
self.strInfo=""
bRefreshDisp = False
self.done=False
val = 0
self.RecBuff[self.RecIndex]=ch
if self.RecIndex==0:
if self.RecBuff[0] != 0xff:
self.RecIndex = 0
return -1
elif self.RecIndex==1:
if self.RecBuff[1] != 0xcd:
self.RecIndex = 0
return -1
elif self.RecIndex==2:
self.data_len = self.RecBuff[2] - 1
if self.data_len > 7 or self.data_len < 2:
self.RecIndex = 0
return -2
else:
self.data_len-=1
if self.data_len == 0:
#数据接收完成
#校验
verify = self.RecBuff[2]
for i in range(self.RecBuff[2]-2):
verify += self.RecBuff[4 + i]
if ((verify&0xff) != self.RecBuff[3]):
self.RecIndex = 0
return -3
#提取有用数据,判断命令字
if self.RecBuff[4]==0x5b: #传感器休眠
self.strInfo="已休眠"
self.status=SLEEP_STATE
self.RecIndex = 0
return 0
elif self.RecBuff[4]==0x5a: #唤醒
self.strInfo="已唤醒"
self.status=WAKEUP_STATE
self.RecIndex = 0
return 0
elif self.RecBuff[4]==0x54: #测量阶段
self.bDeviceAck = True
if ((self.RecBuff[5] and 0x10) == 1):
self.strInfo = "有心跳"
else:
self.strInfo = "无心跳"
#气压值
bRefreshDisp=True
self.val = self.RecBuff[5] & 0x0f
self.val = (val << 8) | self.RecBuff[6]
elif self.RecBuff[4] == 0x55: #血压值
if ((self.RecBuff[5] and 0x80) == 1):
#心律不齐
self.strInfo = "心律不齐!"
self.m_wMP = 1
else:
#心率正常
self.strInfo = "心率正常!"
self.m_wMP = 0
self.RecBuff[5] &= 0x7f #把最高位 心率位去掉
#//收缩压
val = (self.RecBuff[5] << 8) | self.RecBuff[6]
#strInfo.Format("%d",val)
self.m_wSP = val
#//舒张压压
val = (self.RecBuff[7] << 8) | self.RecBuff[8]
#strInfo.Format("%d",val);
self.m_wDP = val
#心率
#strInfo.Format("%d",RecBuff[9]);
self.m_wPR = self.RecBuff[9]
self.done=True
return self.m_wSP,self.m_wDP,self.m_wPR
elif self.RecBuff[4]==0x56:
if(self.RecBuff[5]==0):
self.strInfo=STATUS_ERROR_INFO_0
elif self.RecBuff[5]==1:
self.strInfo=STATUS_ERROR_INFO_1
elif self.RecBuff[5]==2:
self.strInfo=STATUS_ERROR_INFO_2
elif self.RecBuff[5]==3:
self.strInfo=STATUS_ERROR_INFO_3
elif self.RecBuff[5]==4:
self.strInfo=STATUS_ERROR_INFO_4
else:
self.strInfo="報告: 未知錯誤!"
return -4
else:
return -5
bRefreshDisp=True
self.RecIndex+=1
if (bRefreshDisp):
bRefreshDisp = False
self.RecIndex = 0
return 0
def sleep(self):
if self.ser.isOpen():
self.ser.write(SLEEP_Cmd)
def wakeup(self):
if(self.ser.isOpen() and self.status!=WAKEUP_STATE):
self.ser.write(WAKEUP_Cmd)
self.status=WAKEUP_STATE
def start(self):
if(self.ser.isOpen() and self.status!=START_STATE):
self.ser.write(START_Cmd)
time.sleep(0.5)
self.ser.write(START_Cmd)
i=0
time.sleep(2)
while self.done==False:
temp=self.ser.read()
if temp!=0 and temp!=b'':
temp=ord(temp)
self.getdata(temp)
else:
i+=1
if i>3:
break
if(self.done):
print(self.m_wSP,self.m_wDP,self.m_wPR)
#self.done=False
else:
print(self.strInfo)
def open(self):
if not self.ser.isOpen():
self.ser.open()
def close(self):
if self.ser.isOpen():
self.ser.close()
if __name__=="__main__":
a=bpsensor('/dev/ttyUSB0')
a.open()
a.start()
血压计测量测试程序:
[mw_shl_code]
[mw_shl_code=python,true]
# -*- coding: utf-8 -*-import json
import time
import threading
from bpr import bpsensor
import paho.mqtt.client as paho
port='/dev/ttyUSB0'
isfinished=threading.Event()
client = paho.Client("blood_pressure")
uid='ID002'
client.connect("test.mosquitto.org")
def getdata(sensor):
try:
sensor.open()
sensor.start()
except Exception:
print('连接线松动,设备丢失!')
isfinished.set()
isfinished.set()
try:
a=bpsensor(port)
while client.loop()==0:
data=input("是否开始测量y/n:") #测试时用命令行,也可改成按钮
if data=="y" or data=='Y':
isfinished.clear()
t1=threading.Thread(target=getdata,args=[a])
t1.start()
print('测量中...')
isfinished.wait()
if a.done:
ts=time.strftime("%Y-%m-%dT%H:%M:%S")
payload=json.dumps({'GY':{'ts':ts,'value':a.m_wSP},'DY':{'ts':ts,'value':a.m_wDP},'MB':{'ts':ts,'value':a.m_wPR}})
client.publish('patients/%s'%uid,payload,1)
print(payload)
a.done=False
else:
print(a.strInfo)
else:
print('退出')
client.disconnect()
except KeyboardInterrupt:
print("bye!")
client.disconnect()
[/mw_shl_code]
推送的数据必须用mqtt的客户端订阅才能接收到推送:
[mw_shl_code=python,true]
# -*- coding: utf-8 -*-
import time
import redis
import os
import json
import paho.mqtt.client as paho
mypid = os.getpid()
client = paho.Client("blood_pressure_rec",clean_session=False)
conn = redis.Redis('localhost')
def SendToRedisHash(uid,tagvaluedict):
pipe =conn.pipeline()
for key in tagvaluedict:
pipe.hmset('%s.%s'%(uid,key),tagvaluedict[key])
pipe.execute()
def on_message(mosq, obj, msg):
#called when we get an MQTT message that we subscribe to
slist=msg.topic.split("/")
dic1=json.loads(msg.payload.decode())
SendToRedisHash(slist[1],dic1)
print(slist[1])
def connectall():
print("DISPATCHER: Connecting")
client.connect(host="test.mosquitto.org",port=1883)
print('Connected')
client.subscribe("patients/",1)
client.on_message = on_message
def disconnectall():
print("DISPATCHER: Disconnecting")
client.disconnect()
if __name__ == '__main__':
connectall()
try:
while client.loop()==0:
pass
# Look for commands in the queue and execute them
except KeyboardInterrupt:
print ("Interrupt received")
disconnectall()
[/mw_shl_code]
当有数据推送至服务器时,本程序将自动把内容存入本机的redis数据库。
总之,MQTT是一种十分适用于物联网应用的通讯协议,并且正蓬勃发展着。它可以实现推送服务以及设备间双向通讯,拥有开放的公网服务器(不需要自己建站),给我们带来巨大便利。
未完待续
|
|