基于MQTT的wifi物联网血压计设计-Arduino中文社区 - Powered by Discuz! Archiver

chengts95 发表于 2014-10-10 23:21

基于MQTT的wifi物联网血压计设计

本帖最后由 chengts95 于 2015-2-27 16:09 编辑

1.MQTTMQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。早在1999年,IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士发明了MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)技术。过去需要不断轮询才能得到即时数据。如今却只需要订阅一个topic,这正是MQTT能够实现的。
2.Mosquitto
      Mosquitto,一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件(server),提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化。并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。官网地址:http://mosquitto.org/。
      MQTT完全文档 Building Smarter Planet Solutions with MQTT and IBM WebSphere MQ Telemetry下载地址:
http://www.redbooks.ibm.com/abstracts/sg248054.html


       内有arduino作为mqtt client的具体案例。
       test.mosquitto.org:是eclipse为了支持mqtt发展而设立的公共mqtt服务器,访问十分方便,对设计物联网应用十分有帮助。
3.paho
      Paho 项目旨在提供可伸缩的开放和标准的 Machine-to-Machine (M2M) 以及物联网消息协议的开源实现。Paho 提供 MQTT 发布/订阅实现,相当于是mqtt客户端,官方网站给出了C++、java、android、python以及通用的C语言客户端和源代码,下面只介绍python客户端的使用。
    官方网站:http://www.eclipse.org/paho/


安装paho-mqtt:
    使用pip工具安装paho-mqtt,输入以下指令即可: pip install paho-mqtt


4.血压计设计
要使病人一旦启动血压测量,数据由主机的mqtt-paho客户端发布至test.mosquitto.org服务器。只要是订阅了相应病人血压主题的医生,都可以及时掌握病人的血压变化情况。例如,在自己的android手机上像qq、微信消息那样及时显示出病人当前的血压情况,甚至可以实现远程诊断。

我用华科电子的HKB-08B做示例,本血压计模块支持usb串口(cp2102),可以直接连接在树莓派上。我将HKB-08B的通讯协议写成了python的模块,使其方便调用。然后写了测试用的python demo放在树莓派上运行。代码如下:
bpr.py:

# -*- coding: utf-8 -*-
import serial
import time
import threading
SLEEP_Cmd =
WAKEUP_Cmd =
START_Cmd =
STOP_Cmd =

START_STATE=0
STOP_STATE=1
SLEEP_STATE=2
WAKEUP_STATE=3

STATUS_NORMAL_INFO="設備已經連接!"
STATUS_ERROR_INFO_0="報告: 測量不到有效的脈搏!"
STATUS_ERROR_INFO_1="報告: 氣袋沒有綁好!"
STATUS_ERROR_INFO_2="報告: 測量結果數值有誤!"
STATUS_ERROR_INFO_3="報告: 進入超壓保護!"
STATUS_ERROR_INFO_4="報告: 幹預過多!"

class bpsensor:
    def __init__(self,port):
      self.ser=serial.Serial(
            port=None,            # number of device, numbering starts at
# zero. if everything fails, the user
# can specify a device string, note
# that this isn't portable anymore
# if no port is specified an unconfigured
# an closed serial port object is created
            baudrate=115200,          # baud rate
            bytesize=8,   # number of databits
            parity=serial.PARITY_NONE,   # enable parity checking
            stopbits=serial.STOPBITS_ONE,# number of stopbits
            timeout=3,         # set a timeout value, None for waiting forever
            xonxoff=0,            # enable software flow control
            rtscts=0,               # enable RTS/CTS flow control
            interCharTimeout=None   # Inter-character timeout, None to disable
)
      self.ser.port=port
      self.RecBuff=
      self.RecIndex = 0
      self.data_len = 0
      self.strInfo=""
      self.status=1
      self.done=False


    def __del__(self):
      self.close()

    def getdata(self,ch):
      self.strInfo=""
      bRefreshDisp = False
      self.done=False
      val = 0
      self.RecBuff=ch


      if self.RecIndex==0:
                if self.RecBuff != 0xff:
                        self.RecIndex = 0
                        return -1

      elif self.RecIndex==1:
                if self.RecBuff != 0xcd:

                        self.RecIndex = 0
                        return -1

      elif self.RecIndex==2:
                self.data_len = self.RecBuff - 1
                if self.data_len > 7or self.data_len < 2:

                        self.RecIndex = 0
                        return -2

      else:
                self.data_len-=1
                if self.data_len == 0:

                        #数据接收完成
                        #校验
                        verify = self.RecBuff
                        for i in range(self.RecBuff-2):

                           verify +=self.RecBuff

                        if ((verify&0xff) != self.RecBuff):
                              self.RecIndex = 0
                              return -3


                        #提取有用数据,判断命令字

                        if self.RecBuff==0x5b:      #传感器休眠
                              self.strInfo="已休眠"
                              self.status=SLEEP_STATE
                              self.RecIndex = 0
                              return 0
                        elif self.RecBuff==0x5a:   #唤醒
                              self.strInfo="已唤醒"
                              self.status=WAKEUP_STATE
                              self.RecIndex = 0
                              return 0
                        elif self.RecBuff==0x54:   #测量阶段
                              self.bDeviceAck = True

                              if ((self.RecBuff and 0x10) == 1):


                                        self.strInfo = "有心跳"
                              else:


                                        self.strInfo = "无心跳"


                              #气压值
                              bRefreshDisp=True
                              self.val = self.RecBuff & 0x0f
                              self.val = (val << 8) | self.RecBuff
                        elif self.RecBuff == 0x55:         #血压值
                              if ((self.RecBuff and 0x80) == 1):

                                        #心律不齐
                                        self.strInfo = "心律不齐!"
                                        self.m_wMP = 1
                              else:

                                        #心率正常
                                        self.strInfo = "心率正常!"
                                        self.m_wMP = 0


                              self.RecBuff &= 0x7f #把最高位 心率位去掉

                              #//收缩压
                              val = (self.RecBuff << 8) | self.RecBuff
                              #strInfo.Format("%d",val)
                              self.m_wSP = val

                              #//舒张压压
                              val = (self.RecBuff << 8) | self.RecBuff
                              #strInfo.Format("%d",val);
                              self.m_wDP = val

                              #心率
                              #strInfo.Format("%d",RecBuff);
                              self.m_wPR = self.RecBuff
                              self.done=True
                              return self.m_wSP,self.m_wDP,self.m_wPR   

                        elif self.RecBuff==0x56:


                            if(self.RecBuff==0):
                              self.strInfo=STATUS_ERROR_INFO_0

                            elif self.RecBuff==1:
                              self.strInfo=STATUS_ERROR_INFO_1

                            elif self.RecBuff==2:
                              self.strInfo=STATUS_ERROR_INFO_2

                            elif self.RecBuff==3:
                              self.strInfo=STATUS_ERROR_INFO_3

                            elif self.RecBuff==4:
                              self.strInfo=STATUS_ERROR_INFO_4

                            else:
                              self.strInfo="報告: 未知錯誤!"

                              return -4
                        else:
                              return -5
                        bRefreshDisp=True


      self.RecIndex+=1

      if (bRefreshDisp):

                bRefreshDisp = False

                self.RecIndex = 0   


      return 0
    def sleep(self):
      if self.ser.isOpen():
            self.ser.write(SLEEP_Cmd)



    def wakeup(self):
      if(self.ser.isOpen() and self.status!=WAKEUP_STATE):
            self.ser.write(WAKEUP_Cmd)
            self.status=WAKEUP_STATE

    def start(self):
      if(self.ser.isOpen() and self.status!=START_STATE):
            self.ser.write(START_Cmd)
            time.sleep(0.5)
            self.ser.write(START_Cmd)
            i=0
            time.sleep(2)
            while self.done==False:
                temp=self.ser.read()
                if temp!=0 and temp!=b'':
                  temp=ord(temp)
                  self.getdata(temp)
                else:
                  i+=1
                if i>3:
                  break

            if(self.done):
                print(self.m_wSP,self.m_wDP,self.m_wPR)
                #self.done=False
            else:
                print(self.strInfo)



    def open(self):
      if not self.ser.isOpen():
            self.ser.open()

    def close(self):
      if self.ser.isOpen():
            self.ser.close()



if __name__=="__main__":
    a=bpsensor('/dev/ttyUSB0')
    a.open()
    a.start()







血压计测量测试程序:






# -*- coding: utf-8 -*-import json
import time
import threading
from bpr import bpsensor
import paho.mqtt.client as paho


port='/dev/ttyUSB0'



isfinished=threading.Event()

client = paho.Client("blood_pressure")
uid='ID002'
client.connect("test.mosquitto.org")
def getdata(sensor):
    try:
      sensor.open()
      sensor.start()
    except Exception:
      print('连接线松动,设备丢失!')
      isfinished.set()

    isfinished.set()
try:

    a=bpsensor(port)
    while client.loop()==0:

      data=input("是否开始测量y/n:")#测试时用命令行,也可改成按钮

      if data=="y" or data=='Y':
            isfinished.clear()
            t1=threading.Thread(target=getdata,args=)
            t1.start()
            print('测量中...')
            isfinished.wait()
            if a.done:
                ts=time.strftime("%Y-%m-%dT%H:%M:%S")
                payload=json.dumps({'GY':{'ts':ts,'value':a.m_wSP},'DY':{'ts':ts,'value':a.m_wDP},'MB':{'ts':ts,'value':a.m_wPR}})
                client.publish('patients/%s'%uid,payload,1)
                print(payload)
                a.done=False
            else:
                print(a.strInfo)
      else:
            print('退出')
            client.disconnect()
except KeyboardInterrupt:
      print("bye!")
      client.disconnect()


   

      推送的数据必须用mqtt的客户端订阅才能接收到推送:

# -*- coding: utf-8 -*-

import time
import redis
import os
import json
import paho.mqtt.client as paho

mypid = os.getpid()
client = paho.Client("blood_pressure_rec",clean_session=False)
conn = redis.Redis('localhost')

defSendToRedisHash(uid,tagvaluedict):

      pipe =conn.pipeline()   
      for key in tagvaluedict:
            pipe.hmset('%s.%s'%(uid,key),tagvaluedict)

      pipe.execute()



def on_message(mosq, obj, msg):
    #called when we get an MQTT message that we subscribe to
    slist=msg.topic.split("/")
    dic1=json.loads(msg.payload.decode())
    SendToRedisHash(slist,dic1)
    print(slist)





def connectall():
    print("DISPATCHER: Connecting")
    client.connect(host="test.mosquitto.org",port=1883)
    print('Connected')
    client.subscribe("patients/",1)
    client.on_message = on_message

def disconnectall():
    print("DISPATCHER: Disconnecting")
    client.disconnect()


if __name__ == '__main__':   
    connectall()
    try:
      while client.loop()==0:

            pass
      # Look for commands in the queue and execute them

    except KeyboardInterrupt:
      print ("Interrupt received")
      disconnectall()





当有数据推送至服务器时,本程序将自动把内容存入本机的redis数据库。



总之,MQTT是一种十分适用于物联网应用的通讯协议,并且正蓬勃发展着。它可以实现推送服务以及设备间双向通讯,拥有开放的公网服务器(不需要自己建站),给我们带来巨大便利。
未完待续












Yunba云巴 发表于 2016-12-21 22:23

我们云巴是基于MQTT协议、采用Erlang/OTP架构设计的实时通信系统,具有跨平台实时通信、消息推送、实时在线以及实时统计功能。

之所以选择MQTT协议,就是因为它具有可靠传输、低功耗、轻量的特点,我们基于 MQTT协议实现了物联网智能硬件云、移动端消息推送以及实时聊天等功能。比如,
https://yunba.io/usercases/smarthouse/ 就是我们基于MQTT协议实现的一个智能家居实践案例,我们还制作了实践过程的文档,参照文档也可以自己实现出来。
页: [1]
查看完整版本: 基于MQTT的wifi物联网血压计设计