您的位置:首页 > 其它

influxdb入门,批量插入数据提高性能

2018-01-02 23:41 381 查看

简介

       InfluxDB 是一个开源分布式时序、事件和指标数据库。使用Go语言编写,无需外部依赖。其设计目标是实现分布式和水平伸缩扩展。

       它有三大特性:

       1. Time Series (时间序列):你可以使用与时间有关的相关函数(如最大,最小,求和等);

       2. Metrics(度量):你可以实时对大量数据进行计算;

       3. Eevents(事件):它支持任意的事件数据。

       详细请参考官网:https://influxdata.com/

安装

       采用的是influxdb-0.13.0-static_linux_amd64.tar.gz,直接解压缩,进入解压目录运行

[java]
view plain
copy

print?

./influxd -pidfile influxd.pid -config influxdb.conf  

./influxd -pidfile influxd.pid -config influxdb.conf
其它安装方式请参考:https://docs.influxdata.com/influxdb/v0.13/introduction/installation/
下载地址:https://dl.influxdata.com,找到对应版本的Key,拼上前面的url即可下载。

       注意influxdb.conf需要按照修改某些dir和port。influxdb服务默认使用端口:

[java]
view plain
copy

print?

TCP port 8083 is used for InfluxDB’s Admin panel  
TCP port 8086 is used for client-server communication over InfluxDB’s HTTP API  

TCP port 8083 is used for InfluxDB’s Admin panel
TCP port 8086 is used for client-server communication over InfluxDB’s HTTP API
       8088端口其实也被占用了,而且不可配置,参见https://github.com/influxdata/influxdb/blob/master/cmd/influxd/run/config.go第36行DefaultBindAddress。之前的influxdb版本TCP ports 8088是可配置的。

使用

       可采用influx命令或者在浏览器中输入localhost:8083 即可进入web管理页面来使用。

       1.  创建数据库:  CREATE DATABASE testDB
       2. 
显示所有数据库: show databases
       3.  删除数据库: DROP DATABASE "db_name"
       4.  使用数据库: use testDB
       5.  显示该数据库中的表 : SHOW MEASUREMENTS
       6.  删除表:  DROP MEASUREMENT "measurementName"
       7. 
增:

            命令行:

[java]
view plain
copy

print?

use testDB  
insert weather,altitude=1000,area=北 temperature=11,humidity=-4  

use testDB
insert weather,altitude=1000,area=北 temperature=11,humidity=-4
            Http接口:

[java]
view plain
copy

print?

curl -i -XPOST 'http://localhost:8086/write?db=testDB'   
                --data-binary 'weather,altitude=1000,area=北 temperature=11,humidity=-4'  

curl -i -XPOST 'http://localhost:8086/write?db=testDB'
--data-binary 'weather,altitude=1000,area=北 temperature=11,humidity=-4'
            插入数据的格式似乎比较奇怪,这是因为influxDB储存数据所采用的是Line Protocol格式。在上面两个插入数据的方法中,都有一样的部分。

[java]
view plain
copy

print?

weather,altitude=1000,area=北 temperature=11,humidity=-4  

weather,altitude=1000,area=北 temperature=11,humidity=-4
            其中:

            weather : 表名

            altitude=1000,area=北 : tag

            temperature=11,humidity=-4 :field
       8. 删与改:在InfluxDB中并没有提供数据的删除与修改方法。可以通过数据保存策略(Retention Policies)来实现删除。
       9. 查:

            命令行:

[java]
view plain
copy

print?

use testDB  
# 查询最新的三条数据  
SELECT * FROM weather ORDE
d9fd
R BY time DESC LIMIT 3  

use testDB
# 查询最新的三条数据
SELECT * FROM weather ORDER BY time DESC LIMIT 3

            Http接口

[java]
view plain
copy

print?

curl -G 'http://localhost:8086/query?pretty=true'   
        --data-urlencode "db=testDB"   
        --data-urlencode "q=SELECT * FROM weather ORDER BY time DESC LIMIT 3"  

curl -G 'http://localhost:8086/query?pretty=true'
--data-urlencode "db=testDB"
--data-urlencode "q=SELECT * FROM weather ORDER BY time DESC LIMIT 3"
            InfluxDB是支持类SQL语句的,具体的查询语法都差不多。

       10. 用户管理:(以下语句都可以直接在InfluxDB的Web管理界面中调用)

              显示用户   SHOW USERS

              创建用户   CREATE USER "username" WITH PASSWORD 'password'

              创建管理员权限的用户   CREATE USER "username" WITH PASSWORD 'password' WITH ALL PRIVILEGES

              删除用户   DROP USER "username"

几个关键概念

       measurement,就相当于关系数据库中的table,就是tag,field,time的容器;

       对于influxDb的measurement来说,field是必须的,并且不能根据field来排序;

       Tag是可选的,tag可以用来做索引,tag是以字符串的形式存放的;tag字段一般用于where中限制条件。

       retention policy,保留策略,用于决定要保留多久的数据,保存几个备份,以及集群的策略等;

       series,a series is the collection of data that share a retention policy, measurement, and tag set。

Java操作

创建client

[java]
view plain
copy

print?

private static InfluxDB getClient() {  
    InfluxDB client = InfluxDBFactory.connect("http://10.210.228.89:9501", "root", "123456");  
    Pong pong = client.ping();  
    if (pong != null) {  
        System.out.println("Pong : " + pong);  
    } else  
        return null;  
    client.createDatabase(dbName);  
    return client;  
}  

private static InfluxDB getClient() {
InfluxDB client = InfluxDBFactory.connect("http://10.210.228.89:9501", "root", "123456");
Pong pong = client.ping();
if (pong != null) {
System.out.println("Pong : " + pong);
} else
return null;
client.createDatabase(dbName);
return client;
}
写数据:

[java]
view plain
copy

print?

private static void writeData001() {  
    Point point001 = Point.measurement("cpu")  
            .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)  
            .tag("ip", "127.0.0.1").tag("hostname", "localhost")  
            .addField("idle", 80L)  
            .addField("user", 9L)  
            .addField("system", 11L)  
            .build();  
    Point point002 = Point.measurement("disk")  
            .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)  
            .tag("ip", "127.0.0.1")  
            .addField("used", 80L)  
            .addField("free", 1L)  
            .build();  
  
    BatchPoints batchPoints = BatchPoints  
            .database(dbName)  
            //.tag("async", "true") //Add a tag to this set of points.  
            .retentionPolicy("default")  
            .consistency(InfluxDB.ConsistencyLevel.ALL)  
            .build();  
    batchPoints.point(point001).point(point002);  
    client.write(batchPoints);  
}  

private static void writeData001() {
Point point001 = Point.measurement("cpu")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("ip", "127.0.0.1").tag("hostname", "localhost")
.addField("idle", 80L)
.addField("user", 9L)
.addField("system", 11L)
.build();
Point point002 = Point.measurement("disk")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("ip", "127.0.0.1")
.addField("used", 80L)
.addField("free", 1L)
.build();

BatchPoints batchPoints = BatchPoints
.database(dbName)
//.tag("async", "true") //Add a tag to this set of points.
.retentionPolicy("default")
.consistency(InfluxDB.ConsistencyLevel.ALL)
.build();
batchPoints.point(point001).point(point002);
client.write(batchPoints);
}


[java]
view plain
copy

print?

private static void writeData002() {  
    // Flush every 2000 Points, at least every 100ms  
    client.enableBatch(2000, 100, TimeUnit.MILLISECONDS);  
  
    Point point1 = Point.measurement("cpu")  
            .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)  
            .addField("idle", 90L)  
            .addField("user", 9L)  
            .addField("system", 1L)  
            .build();  
    Point point2 = Point.measurement("disk")  
            .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)  
            .addField("used", 80L)  
            .addField("free", 1L)  
            .build();  
  
    client.write(dbName, "default", point1);  
    client.write(dbName, "default", point2);  
}  

private static void writeData002() {
// Flush every 2000 Points, at least every 100ms
client.enableBatch(2000, 100, TimeUnit.MILLISECONDS);

Point point1 = Point.measurement("cpu")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("idle", 90L)
.addField("user", 9L)
.addField("system", 1L)
.build();
Point point2 = Point.measurement("disk")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("used", 80L)
.addField("free", 1L)
.build();

client.write(dbName, "default", point1);
client.write(dbName, "default", point2);
}
查询:

[java]
view plain
copy

print?

private static void query() {  
    Query query = new Query("SELECT idle FROM cpu", dbName);  
    QueryResult queryResult = client.query(query);  
    System.out.println(queryResult);  
}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: