|
本帖最后由 Mr.fan 于 2019-3-21 00:25 编辑
项目介绍: 这个项目将会带着你从零开始搭建一个开放的物联网平台,该平台将会提供MQTT协议设备的接入、历史数据的保存、邮件报警通知、远程发送指令等基本功能,你仅需要一台云服务器或者树莓派即可实现。
项目涉及的部分比较繁杂,你需要掌握的基础知识有: - Python基础语法
- DjangoWeb框架
- linux常用命令
- Celery分布式任务队列的基本使用
- vue.js、微信小程序基础知识
- MySQL、Redis的基础知识
当然如果仅仅只是想自己使用不做更改那么直接下载源码运行即可
下面是整体的架构图(后期也许会做调整)
整体架构设计
下将分为四个部分分别介绍基本的流程
一:MQTT服务器的搭建和配置
这里使用了EMQ 开源MQTT消息服务器来搭建,按照自己的系统下载相应的版本安装,EMQ下载,官方有很详细的安装步骤介绍,篇幅原因在此就不展开说明了。
安装完成之后进入到管理控制台 http://localhost:18083 ,缺省用户名/密码: admin/public。登陆成功之后就能可查询 EMQ 消息服务器基本信息、统计数据、度量数据,查询系统客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription),以及对插件的管理。
插件里面可以根据自己需求看看需要开启哪些插件,在这里选择开启MySQL认证访问插件,对连接的设备进行限制(包括连接,订阅和发布),在使用之前需要使用以下命令创建表
[mw_shl_code=c,true]
CREATE TABLE `mqtt_acl` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
`ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
`username` varchar(100) DEFAULT NULL COMMENT 'Username',
`clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
`access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
`topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;[/mw_shl_code]创建完成之后进入插件管理页面,打开MySQL配置,按照下图的介绍填写自己的信息
MySQL设置
配置完成打开插件之后,使用EMQ自带的websocket连接测试一下,查看是否能够连接成功。
服务端的配置差不多就这些了,关于ACL的设置问题,只允许连接设备发布注册过的topic消息,订阅设备ID的topic和自组网ID的topic,这样不同用户的发布的消息就不会被其他用户窃取。如果想要设备之间转发消息,可在为自己的设备建立一个自组网络,设备可订阅和发布指定的topic消息实现设备之间的消息传递。
二:后台服务的搭建 后台主要用来提供数据接口,处理业务逻辑,保留用户信息,后台服务使用django搭建,如果不了解这个框架可参考这两篇文章Django基础、Django进阶
数据库的设计
数据库使用django的model创建,代码比较多不放代码了,看一下ER图
简单说明一下:
一个用户可以创建多个设备、一个自组网、多个快捷指令,每个设备可以有多个数据流信息,每个数据流可以设置一个触发器,后面要讲的服务层会对注册的数据流进行监听,每当有消息发送过来的时候会将上传的数据存入数据库中,如果满足报警条件会执行报警通知给用户。
全局权限认证的设计:
因为会有微信小程序的接入,小程序不支持cookie保存,所以采用jwt的方式来保存用户登录状态。关于jwt简单来说就是第一次登录验证成功后对想要保存的信息进行加密,然后将加密过后的内容返回给前端,前端每次请求的时候都携带上这个token,后端拿到token后对其解密就能获取之前保存的信息了。
权限认证采用RestFramework框架中的认证模块来实现,后端从请求头中取出jwt,使用jwt模块对其信息进行解码获取用户的登录信息,认证通过后向request的添加用户ID,接下来所有的业务都需要根据这个ID从数据库中获取数据。
[mw_shl_code=python,true]class Authentication(BaseAuthentication):
def authenticate(self, request): # 如果执行到最后一个还是没有给user赋值,则会返回一个匿名用户
token = (request.META.get("HTTP_AUTHTOKEN"))
if token:
try:
auth = jwt.decode(token, settings.SECRET_KEY , algorithms=['HS256'])
if not auth['user_id']:
raise exceptions.AuthenticationFailed({'code': 1, 'message': '无权访问!', 'data': {}})
except Exception:
raise exceptions.AuthenticationFailed({'code': 1, 'message': '无权访问!', 'data': {}})
else:
raise exceptions.AuthenticationFailed({'code': 1, 'message': '无权访问!', 'data': {}})
return auth['user_id'], None[/mw_shl_code]
远程控制指令的发送:
这里可以创建一个MQClient来发布消息,为了让后台尽量不直接操作Client还是选取HTTP请求的方式去实现,EMQ为我们提供了REST API的方式进行管理和监控,在官方文档中是这样介绍的。所以后台拿取指令内容,就可以通过HTTP请求的方式实现消息的发布了
指令
使用这些REST API接口需要通过BasicAuth认证,用户名和密码就是登录管理页面时填写的,BasicAuth认证可使用requests库来实现,主要代码如下:
[mw_shl_code=python,true]from requests.auth import HTTPBasicAuth
import requests, json
data = {
'topic': str(request.data.get('topic')),
'payload': str(request.data.get('payload')),
'qos': int(request.data.get('qos')) # qos必须为整数
}
data = json.dumps(data) # Dict转Json
auth = HTTPBasicAuth(settings.MQTT_USER['username'], settings.MQTT_USER['password']) # basicAuth认证
recv = requests.post(url=settings.BASE_EMQ_URL + "/api/v2/mqtt/publish", auth=auth, data=data)[/mw_shl_code]
后台的其他的一些数据库操作、表单验证就不说了都是很基本的操作,如果你会Django的话稍微看看代码应该就能明白
三:服务层
设备上下线消息的获取
这里利用了EMQ的$SYS系统主题,对于系统主题EMQ官方是这样介绍的:EMQ 消息服务器周期性发布自身运行状态、MQTT 协议统计、客户端上下线状态到 $SYS/ 开头系统主题。$SYS 主题路径以 "$SYS/brokers/{node}/" 开头,"${node}"是 Erlang 节点名称。
客户端上下线状态消息的$SYS 主题前缀为: $SYS/brokers/${node}/clients/,可选择具体客户端${clientid}/connected的上下线消息,也可以选择所有设备的上下线消息:#,这里选择接收所有设备的上下线消息,那么需要订阅的主题为$SYS/brokers/emq@127.0.0.1/clients/#,还记得我们之前配置过ACL吗?这时候需要做调整了,我们需要配置一个拥有最高权限的client(可以发布和订阅任何消息),想acl表中插入这么几条数据(意思就是IP地址为127.0.0.1连接的客户端可以订阅和发布任何消息)
[mw_shl_code=c,true]INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
VALUES
(1,1,'127.0.0.1',NULL,NULL,NULL,3,'#'),
(2,1,'127.0.0.1',NULL,NULL,3,'$SYS/#'),
(3,1,'127.0.0.1',NULL,NULL,3,'eq #');[/mw_shl_code]
然后创建一个client来测试一下:[mw_shl_code=python,true]
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("$SYS/brokers/emq@127.0.0.1/clients/#")
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id='python_test')
client.username_pw_set('*****', '*****') # 设置连接用户名
client.on_connect = on_connect
client.on_message = on_message
client.connect("your ip address", 1883, 60)
client.loop_forever()[/mw_shl_code]
成功后如果有设备连接成功或者断开那么我们会收到如下信息,这时候我们就可以在数据库中更新设备的状态了
邮箱通知服务
发送邮件可以借助第三方平台如163、QQ等等,但是使用个人账户去做都是有最大发送数量限制的,如果是自己使用是可以满足需求的,为了稳妥起见提供另外一种方式:搭建自己的STMP服务。以Ubuntu系统为例,树莓派可以另外搜一下怎么安装。
- 安装apt-get install mailutils
- 在设置页面选择Internet site
- 更改配置[mw_shl_code=bash,true]
vim /etc/postfix/main.cf
# 修改以下内容
myhostname = mail.youraddress
mydomain = youraddress
myorigin = admin@youraddress
mydestination = $myhostname, localhost.$mydomain, $mydomain[/mw_shl_code] - 重启服务service postfix restart
- 测试能否发送成功python 自带两个模块可以实现发送邮件的功能,email和 smtplib,email负责构造邮件内容,smtplib用来发送邮件,通过下面这段代码简单测试一下
[mw_shl_code=python,true]import smtplib
from email.mime.text import MIMEText
from email.header import Header
def send(receiver,payload):
sender = 'admin@iotforfml.cn'
# 三个参数:第一个为文本内容,第二个 plain 设置文本格式,第三个 utf-8 设置编码
message = MIMEText(payload, 'plain', 'utf-8')
# 邮件标题
subject = '触发器报警通知'
message['Subject'] = Header(subject, 'utf-8')
try:
smtp_obj = smtplib.SMTP('localhost')
smtp_obj.sendmail(sender, receiver, message.as_string())
except smtplib.SMTPException:
pass
send('your email address','aaaa')[/mw_shl_code] - 结果(如果成功会收到如下的邮件)
消息的持久化
查看了EMQ文档发现并不提供消息的持久化功能,MQTT协议是按照设备一直在线设计的,数据都是保存在内存里的,但是考虑到用户上传传感器数据不可能接收了就扔掉,那样就没法查看历史数据了,所以用户上传的消息必须要能够保存下来,以便查看历史数据,这样一来持久化功能就需要我们自己来实现。
具体的实现方式就是,创建一个client订阅所有注册的数据流ID的topic消息,代码和之前的设备上下线类似,在这里就不展开说了
服务层目前主要就这几个了,待上线的还有天气广播服务,具体操作就是 通过天气信息网的API接口获取天气信息,在通过之前说的RESTAPI将这些信息广播出去,设备如果想要获取天气信息,订阅对应的主题即可获取
四:前端页面
前端负责和用户的交互,数据的可视化等工作,主要分为web端和小程序端,下面将分别介绍一下。
Web端
web端使用vue+elementUI实现(第一个版本是elementUI后面会改用iviewUI),因为我也是刚学这个的只能简单的介绍一下。
使用vue的脚手架工具初始化项目,然后接下来是安装一些依赖,在这个项目里面我用了以下几个库, 通过命令npm install xxx --S 安装。
- "axios": "^0.18.0", 发送请求的类似ajax
- "echarts": "^4.2.0-rc.2", 可视化工具,用于绘制数据曲线图
- "element-ui": "^2.4.9", 网页UI
- "vue-router": "^3.0.1", 路由,这个一般在初始化项目的时候就安装了,如果没有自己手动安装
首先确定一下页面的整体布局,分为三大块,其中顶栏和侧栏显示内容不变,只变主题部分,布局样式使用elementUI中的<el-header>:顶栏容器, <el-aside>:侧边栏容器,<el-main>:主要区域容器实现。
说一下侧边导航栏,导航栏使用el-menu组件实现,将el-menu的 route属性设置为true,或者使用router-link去做跳转,不过那样显得要麻烦一些,所以还是推荐第一种
[mw_shl_code=html,true] <el-menu class="el-menu-vertical-demo"
background-color="#304156"
text-color="#bfcbd9"
active-text-color="#409eff"
:default-active="$route.path"
router >
<el-menu-item index="/developer/dashboard">
<i class="el-icon-ump-18"></i>
<span slot="title">主页</span>
</el-menu-item>
<el-menu-item index="/developer/charts">
<i class="el-icon-ump-shuju2"></i>
<span slot="title">历史数据</span>
</el-menu-item>
<el-menu-item index="/developer/devices">
<i class="el-icon-ump-shebei2"></i>
<span slot="title">设备管理</span>
</el-menu-item>
<el-menu-item index="/developer/streams">
<i class="el-icon-ump-shuju1"></i>
<span slot="title">数据流管理</span>
</el-menu-item>
<el-menu-item index="/developer/triggers">
<i class="el-icon-ump-chufaqi"></i>
<span slot="title">触发器管理</span>
</el-menu-item>
<el-menu-item index="/developer/console">
<i class="el-icon-ump-kongzhitai1"></i>
<span slot="title">控制台</span>
</el-menu-item>
</el-menu>[/mw_shl_code]
侧栏效果图如下
主体部分效果如下
样式部分就不说了看个人喜好,主要说一下如何获取后台的数据,数据的获取需要用到之前安装的axios插件,通过axios的get、post等方法访问后端的接口获取json数据例如get方法,then和catch是es6的语法,具体的我也解释明白。如果数据获取正常,后端返回的数据放在res.data里面,打印到控制台看看返回结果,post同理,只是参数不同
[mw_shl_code=javascript,true]axios.get(url, {
params: { 'key': 'value' }
}).then((res)=> {
console.log(res.data);
}).catch((error)=> {
console.log(error);
});[/mw_shl_code]
一般来说获取数据可以放在created()或者mounted()里面,看自己需求吧,created要先于mounted,created那时候还没有生成dom,如果需要操作dom那么还是放到mounted里面吧。web端就讲这么多了,具体的看代码吧。
微信小程序端:
之前说了小程序不支持cookie操作,需要使用jwt完成状态的保存,先来看一下如何实现微信登录和用户绑定。这里先看一下小程序官方给的登录流程,这只是最基础的流程,根据设计需要我们还需要添加自己的逻辑进去
下面是具体的流程图,接下来会根据这个流程图具体看看小程序端应该怎么实现。
以index页面为起始页面,当页面进行加载的时候会调用onload函数,所以可以在onload函数中检测storage中是否有jwt
[mw_shl_code=javascript,true]onLoad: function() {
// 获取本地的jwt,如果有则跳转到home页面,否则重新获取jwt
wx.getStorage({
key: 'jwt',
success: function(res) {
wx.switchTab({
url: '../home/home'
})
},
})
},[/mw_shl_code]
如果没有找到jwt,这时会调用登录函数从后端获取jwt
[mw_shl_code=javascript,true]//用户登录
let login = function(code) {
wx.request({
url: login_url,
method: 'post',
data: {
'code': code
},
success(res) {
// 登录成功
if (res.data.code === 0) {
// 将后台返回的jwt写入storage中保存
// 这里要使用同步的方法,避免异步请求的时候没有token
wx.setStorageSync('jwt', res.data.data)
// 跳回主页
wx.switchTab({
url: '../home/home'
})
}
// 登录失败没有找到绑定的信息,跳转到绑定页面
else {
wx.navigateTo({
url: '../bind/bind',
})
}
},
fail(res) {
wx.showToast({
title: '服务器错误,登录失败!',
duration: 1000,
icon: 'none',
})
}
})
}[/mw_shl_code]
在bind页面中,需要填写我们的账户信息,这里使用了小程序的form、input组件,需要对password进行MD5加密处理后再提交输入的内容,后端会验证用户名和密码,如果成功后会将获取到的openID和这个用户绑定,并生成jwt。
[mw_shl_code=javascript,true]//用户绑定
let bind = function(username, password) {
var password = md5(password)
//重新获取code避免长时间code失效
wx.login({
success: res => {
wx.request({
url: bind_url,
method: 'POST',
data: {
'username': username,
'password': password,
'code': res.code
},
success(res) {
// 绑定成功
if (res.data.code === 0) {
// 将后台返回的jwt写入storage中保存
// 这里要使用同步的方法,避免异步请求的时候没有token
wx.setStorageSync('jwt', res.data.data)
// 跳回主页
wx.switchTab({
url: '../home/home'
})
} else {
wx.showToast({
title: res.data.msg,
duration: 1000,
icon: 'none',
})
}
},
fail(res) {
wx.showToast({
title: '服务器错误,绑定失败!',
duration: 1000,
icon: 'none',
})
}
})
}
})
}[/mw_shl_code]
下面是几个主要页面效果图
小程序就讲这些了,如果你使用树莓派来实现,那么小程序就无法使用了,因为小程序的服务器需要一个域名并且要备案。
项目总结
这个项目时间跨度有点长了,都是自己一点一点构思设计,算是付出了很多心血了,整个项目涉及到了后端、前端、服务器的搭建等等,单单一篇也很难将方方面面的细节给说明白,关于有些图片上的水印那都是从我自己的博客上复制下来的不存在抄袭,这个系列我写了也有挺长一段时间了,而且也会一直更新下去,如果有哪些不明白的可以看看我的主页https://blog.csdn.net/FanMLei
这是已经上线的微信小程序,想要体验的可以扫码试试(小程序没有更新管理功能,只能查看自己的设备信息,无法添加设备管理设备,以后会慢慢更新上去的),由于当初设计的失误导致小程序和web端API接口不能共用,所以web需要重写了,暂时还未上线,上线后会在帖子中更新出来。
这是第一版的源码:https://github.com/FanMLei/IOTPlatform ,里面包含web端代码、硬件连接测试代码、和后端代码,第二版暂时还未整理完,你可以关注我的github主页,我也会在帖子中同步更新
|
|