InfluxDB 聚合函数实用案例
InfluxDB 聚合函数实用案例
文章大纲
InfluxDB 简介
InfluxDB是GO语言编写的分布式时间序列化数据库,非常适合对数据(跟随时间变化而变化的数据)的跟踪、监控和分析。在我们的项目中,主要是用来收集设备实时上传的值。从而分析该设备值的趋势图和各个设备的能耗占比等一系列功能。InfluxDB的功能很强大,文档也很详细。可美中不足的是,它的单机性能并不是很理想。因为InfluxDB存储的数据量本身是非常巨大的,在执行一些时间范围比较大的sql语句,耗时会很长,甚至直接崩溃。而开源的InfluxDB目前已经不再支持集群。若要通过搭建集群提升性能问题,可以考虑企业版。当然,我们写的程序也有很大的性能优化空间。
能耗趋势图分析
需求:统计指定设备、指定区域、指定分项或者指定能耗类型的能耗趋势图。如下图所示,纵坐标是能耗值,横坐标是时刻(每小时、每天、每周、每月)。
分析:获取某个区间时刻的值,可以用GROUP BY time 进行时间分组。再用聚合函数LAST或者SUM统计。但这个看似很简单的需求却暗藏杀机。SQL语句如下
SELECT LAST("currentValue"), * FROM "$TABLE_NAME" WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' GROUP BY time($timeSpan) ORDER BY time DESC
第一:先要清楚,数据是通过什么规则保存到InfluxDB数据库
为了记录设备能耗的实时数据,我们会通过订阅MQTT通道,当值发生变化后存储到InfluxDB数据库中,或者在指定时间范围内没有变化也会上传。这样做的好处可以避免一些冗余数据,同时也埋下了一个坑。
例如:一台设备在InfluxDB数据库中最后一次记录的时间是15分钟前。但是sql语句是从5分钟前开始统计。这会导致该设备的其点值就是null。简单来说:设备的存储的值正好在分组统计的时间范围外。解决方法有很多:比如用FILL(previous)函数填充;比如使用time(time_interval,offset_interval)进行时间推移等。但是我比较推荐下面的方法:
先获取指定开始时间之前的最后值(lastValue),然后再根据返回值是否为null,来决定是否替换或者更新lastValue。伪代码如下。
## 获取该设备的最后记录值 val lastValue = "SELECT LAST("currentValue") FROM "$TABLE_NAME" WHERE time <= '$startTime'" ## 遍历查询结果,将currentValue为 null的值替换 "SELECT LAST("currentValue"), * FROM "$TABLE_NAME" WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' GROUP BY time($timeSpan) ORDER BY time DESC".forEach { lastValue = currentValue?: lastValue result[time] = currentValue?: lastValue }
你以为这样就结束了吗?还不够,返回的time格式化后,你会发现有8小时的时区问题。
第二:解决InfluxDB时区问题
InfluxDB 默认以UTC时间存储并返回时间戳,查询返回的时间戳对应的也是UTC时间。我们需要通过tz()子句指定时区名称,比如Asia/Shanghai。若InfluxDB安装在Windows环境上,可能还会出现 error parsing query: unable to find time zone 错误,解决方法是安装GO语言环境,文章也详细介绍过。
SELECT LAST("currentValue"), * FROM "$TABLE_NAME" WHERE time > '$startTime' AND time <= '$endTime' AND id = '$id' GROUP BY time($timeSpan) ORDER BY time DESC tz('Asia/Shanghai')
实用tz() 子句后,返回的时间格式:"2019-11-18T13:50:00+08:00"。需要通过 "yyyy-MM-dd'T'HH:mm:ss" 将其格式化。
第三:GROUP BY time 自然月
group by time 支持秒、分钟、小时、天和周,却唯独不支持自然月。如果对数据的精准性要求不高,可以考虑使用30d实现。或者分12次统计。或者有更好的方法,请不吝赐教😲!
Spring 整合 InfluxDB
初始化配置
整合分三步:导包、配置、初始化连接
compile('org.influxdb:influxdb-java:2.8')
influx.server=http://IP influx.port=8086 influx.username=admin influx.password=admin influx.dbname=database
import org.influxdb.InfluxDB import org.influxdb.InfluxDBFactory import org.influxdb.dto.Point import org.influxdb.dto.Query import org.influxdb.impl.InfluxDBResultMapper import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import java.util.concurrent.TimeUnit import javax.annotation.PostConstruct import javax.annotation.PreDestroy @Component class InfluxDbConnector { val logger: Logger = LoggerFactory.getLogger(InfluxDbConnector::class.java) @Value("\${influx.server}") lateinit var serverUrl: String @Value("\${influx.port}") lateinit var serverPort: String @Value("\${influx.db-name}") lateinit var dbName: String @Value("\${influx.user-name}") lateinit var userName: String @Value("\${influx.password}") lateinit var password: String lateinit var connection: InfluxDB val resultMapper: InfluxDBResultMapper = InfluxDBResultMapper() @PostConstruct fun initConnection() { val connectionUrl = "$serverUrl:$serverPort" connection = InfluxDBFactory.connect(connectionUrl, userName, password) connection.setDatabase(dbName) connection.enableBatch(1000, 1000, TimeUnit.MILLISECONDS) } @PreDestroy fun closeConnection() { connection.close() } fun <T> query(sql: String, type: Class<T>): List<T> { logger.info("exec influx query: {}", sql) val result = connection.query(Query(sql, dbName)) return resultMapper.toPOJO(result, type) } fun query(sql: String) { logger.info("exec influx query: {}", sql) connection.query(Query(sql, dbName)) } fun save(points: List<Point>) { points.forEach { connection.write(it) } } }
存储和查询数据
定义实体
import java.time.Instant; @Measurement(name = "tableName") public class StringVariableResultJ { @Column(name = "currentValue") public String value; @Column(name = "time") public Instant time; // ...... }
批量保存数据
val points = equipmentEnergies.map { Point.measurement(TABLE_NAME_EQUIPMENT) .tag("equipmentId", it.equipmentId) .tag("locationId", it.locationId) .tag("subItemInstanceId", it.subItemInstanceId) .tag("subItemId", it.subItemId) .tag("projectId", it.projectId) .time(it.lastSavedTime?.toEpochMilli()?:0, TimeUnit.MILLISECONDS) .addField("currentValue", it.value.toString().toBigDecimalOrNull()).build() } influxDbConnector.save(points)
查询数据
influxDbConnector.query(sql, StringVariableResultJ::class.java).sortedBy { it.time }
项目是用kotlin写的,可是在用InfluxDBResultMapper.toPOJO 时会出现数据转换异常的问题。若换成Java的实体类就没有问题。原因目前没有找到。
删除数据
我在官网文档上并没有找到删除数据的内容,只有修改数据库存储策略。但实际上执行delete sql语句是生效的😂。数据保留策略目的是让InfluxDB能够知道哪些数据是可以丢弃的,从而节省空间,更高效的处理数据。默认是不限制。以下是常见的命令。
# 查看库存储规则 > SHOW RETENTION POLICIES ON 数据库名称; [out]: name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 720h0m0s 168h0m0s 1 true # 修改存储规则 > ALTER RETENTION POLICY autogen ON 数据库名称 DURATION 0; # 设为默认 > ALTER RETENTION POLICY autogen ON 数据库名称 DEFAULT; #创建规则 > CREATE RETENTION POLICY "规则名" ON 数据库名称 DURATION 360h REPLICATION 1; # 删除规则 > DROP RETENTION POLICY 规则名 ON 数据库名称;
duration 表示在这个时间外的数据将不会被保留,0表示不限制。default 表示是否为默认规则。其它含义没有深究。
实际场景中,不同表的数据需要保留的时间也不一样。此时可以考虑用sql语句,用程序定时删除数据。
influxDbConnector.query("DELETE FROM \"tableName" WHERE time < '$时间' ")
文章到这里就结束了,更多的聚合函数可以看官方文档:https://docs.influxdata.com/influxdb/v1.7/query_language/functions/
- 查看远端的端口是否通畅3个简单实用案例!
- Struts2自带的上传方法(简单实用)案例图片上传,文件也可,
- mysql存储过程使用CURSOR操作多列数据实用案例
- android二维码开发的实用案例
- 过滤器实用案例
- 设计模式实用案例之单例模式
- JAVA实用案例之验证码开发
- 新书推荐:百问FreeSwitch:VOIP 软交换 实用案例解答 余洪涌著
- 《MATLAB数字信号处理85个实用案例精讲.入门到进阶+源代码》PDF版电子书下载
- 31个实用find命令的案例
- 查看远端的端口是否通畅3个简单实用案例!
- 查看远端的端口是否通畅3个简单实用案例
- [zz]Ubuntu建立本地源实用案例
- 查看远端的端口是否通畅3个简单实用案例
- Ubuntu建立本地源实用案例
- Influxdb简单实用操作
- git的简单实用案例
- 8 个实用的 Bootstrap 3 案例教程
- ActionBar实用案例(返回功能、子菜单、搜索功能)
- 机器学习实用案例解析--读书笔记