EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库
前言
InfluxDB 是一个用于存储和分析时间序列数据的开源数据库,内置 HTTP API,类 SQL 语句的支持和无结构的特性对使用者而言都非常友好。它强大的数据吞吐能力以及稳定的性能表现使其非常适合 IoT 领域。
通过 EMQ X 消息引擎,我们可以自定义 Template 文件,然后将 Json 格式的 MQTT 消息转换为 Measurement 写入 InfluxDB:
场景介绍
该场景需要将 EMQ X 指定主题下且满足条件的消息存储到 InfluxDB 时序数据库。为了便于后续分析检索,消息内容需要进行拆分存储。
该场景下客户端上报数据如下:
-
Topic:data/sensor
-
Payload:
{ "location": "bedroom", "data": { "temperature": 25, "humidity": 46.4, "pm2_5": 0.5 } }
准备工作
数据库安装及初始化
创建
db数据库并开放 8089 UDP 端口。
$ docker pull influxdb $ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git $ cd influx_udp $ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest
配置说明
创建资源
打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 InfluxDB 资源类型并完成相关配置进行资源创建。
创建规则
进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则创建。触发事件 选择 message.publish,即在 EMQ X 收到 PUBLISH 消息时触发该规则进行数据处理。
选定触发事件后,我们可在界面上看到可选字段及示例 SQL:
筛选所需字段
规则引擎使用 SQL 语句过滤和处理数据。例如前文提到的场景中我们需要将
payload中的字段提取出来使用,则可以通过
payload.<fieldName>实现。同时我们仅仅期望处理
data/sensor主题,那么可以在 WHERE 子句中使用主题通配符
=~对
topic进行筛选:
topic =~ 'data/sensor', 最终我们得到 SQL 如下:
SELECT payload.location as location, payload.data.temperature as temperature, payload.data.humidity as humidity, payload.data.pm2_5 as pm2_5 FROM "message.publish" WHERE topic =~ 'data/sensor'
SQL 测试
借助 SQL 测试功能,我们可以快速确认刚刚填写的 SQL 语句是否能达到我们的目的。首先填写用于测试的 payl 7ff7 oad 等数据如下:
然后点击 测试 按钮,得到以下输出结果,与预期相符。
{ "humidity": 46.4, "location": "bedroom", "pm2_5": 0.5, "temperature": 25 }
添加响应动作,存储消息到 InfluxDB
SQL 条件输入输出无误后,我们继续添加响应动作,配置写入 SQL 语句,将筛选结果存储到 InfluxDB。
点击响应动作中的 添加 按钮,选择动作 保存数据到 InfluxDB,选取刚刚创建的
InfluxDB资源,再按照实际需求将
${fieldName}填写到
Field Keys,
Tag Keys和
Timestamp Key中,
Measurement表示将数据写入
InfluxDB时使用的
Measurement,最后点击 新建 按钮完成规则创建。
测试
预期结果
我们成功创建了一条规则,包含一个处理动作,动作期望效果如下:
- 客户端向
data/sensor
主题上报消息时,该消息将命中规则,规则列表中 已命中 数字将会增加 1; - InfluxDB 的
db
数据库中将会增加一条数据,数据内容与处理后的消息内容一致。
使用 Dashboard 中的 Websocket 工具测试
切换到 工具 --> Websocket 页面,使用任意 Client ID 连接到 EMQ X,连接成功后在 消息 卡片中发送如下消息:
-
Topic:data/sensor
-
Payload:
{ "location": "bedroom", "data": { "temperature": 25, "humidity": 46.4, "pm2_5": 0.5 } }
点击 发送 按钮,发送成功后可以看到当前规则已命中次数已经变为 1。
然后检查 InfluxDB,新的 data point 是否添加成功:
$ docker exec -it influxdb influx > use db Using database db > select * from "sensor_data" name: sensor_data time humidity location pm2_5 temperature ---- -------- -------- ----- ----------- 1561535778444457348 46.4 bedroom 0.5 25
至此,我们通过规则引擎实现了存储消息到 InfluxDB 数据库的业务开发。
在阅读该教程之前,假定你已经了解 MQTT、EMQ X 的简单知识。
更多信息请访问我们的官网 emqx.io,或关注我们的开源项目 github.com/emqx/emqx ,详细文档请访问 官方文档。
- Html5 学习系列(六)Html5本地存储和本地数据库
- 世纪佳缘信息爬取存储到mysql,下载图片到本地,从数据库选取账号对其发送消息更新发信状态
- ActiveMQ系列—ActiveMQ性能优化(下3)(消息存储方案 RDB)
- ActiveMQ系列—ActiveMQ性能优化(下1)(消息存储方案 KahaDB)
- 《解剖PetShop》系列之 一:系统架构设计 二:数据访问层之数据库访问设计 三:数据访问层之消息处理
- Java基础系列12:使用CallableStatement接口调用数据库中的存储过程
- 数据库的存储系列———将图片存储到数据库
- JMS&MQ系列之消息存储方式
- 图数据库实践系列 (二)--Neo4J空间数据存储
- InfluxDB 开源分布式时序、事件和指标数据库
- BizTalk开发系列(十八) 使用信封拆分数据库消息
- Html5 学习系列(六)Html5本地存储和本地数据库
- 服务器数据库系列 - Web站点数据库分布存储浅谈
- Html5 学习系列(六)Html5本地存储和本地数据库
- Html5 学习系列(六)Html5本地存储和本地数据库
- InfluxDB 开源分布式时序、事件和指标数据库
- MySQL优化系列(五)--数据库存储引擎(主要分析对比InnoDB和MyISAM以及讲述Mrg_Myisam分表)
- 世纪佳缘信息爬取存储到mysql,下载图片到本地,从数据库选取账号对其发送消息更新发信状态
- 请单击下面的按钮,以重定向到可以选择新数据存储区的页。下面的消息可能会有助于诊断问题: 无法连接到 SQL Server 数据库。
- 世纪佳缘信息爬取存储到mysql,下载图片到本地,从数据库选取账号对其发送消息更新发信状态