您的位置:首页 > 大数据

Druid:一个用于大数据实时处理的开源分布式系统

2018-08-16 15:01 627 查看
Druid是一个用于大数据实时查询和分析的高容错、高性能开源分布式系统,旨在快速处理大规模的数据,并能够实现快速查询和分析。尤其是当发生代码部署、机器故障以及其他产品系统遇到宕机等情况时,Druid仍能够保持100%正常运行。创建Druid的最初意图主要是为了解决查询延迟问题,当时试图使用Hadoop来实现交互式查询分析,但是很难满足实时分析的需要。而Druid提供了以交互方式访问数据的能力,并权衡了查询的灵活性和性能而采取了特殊的存储格式。

Druid功能介于PowerDrill和Dremel之间,它几乎实现了Dremel的所有功能,并且从PowerDrill吸收一些有趣的数据格式。Druid允许以类似Dremel和PowerDrill的方式进行单表查询,同时还增加了一些新特性,如为局部嵌套数据结构提供列式存储格式、为快速过滤做索引、实时摄取和查询、高容错的分布式体系架构等。从官方得知,Druid的具有以下主要特征:

为分析而设计——Druid是为OLAP工作流的探索性分析而构建,它支持各种过滤、聚合和查询等类;

快速的交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内可被查询到;

高可用性——Druid的数据在系统更新时依然可用,规模的扩大和缩小都不会造成数据丢失;

可扩展——Druid已实现每天能够处理数十亿事件和TB级数据。

Druid应用最多的是类似于广告分析创业公司Metamarkets中的应用场景,如广告分析、互联网广告系统监控以及网络监控等。当业务中出现以下情况时,Druid是一个很好的技术方案选择:

需要交互式聚合和快速探究大量数据时;

需要实时查询分析时;

具有大量数据时,如每天数亿事件的新增、每天数10T数据的增加;

对数据尤其是大数据进行实时分析时;

需要一个高可用、高容错、高性能数据库时。

Historical节点:对非实时数据进行处理存储和查询

Realtime节:实时摄取数据、监听输入数据流

Coordinator节点:监控Historical节点

Broker节点:接收来自外部客户端的查询和将查询转发到Realtime和Historical节点

Indexer节点:负责索引服务

一个Druid集群有各种类型的节点(Node)组成,每个节点都可以很好的处理一些的事情,这些节点包括对非实时数据进行处理存储和查询的Historical节点、实时摄取数据、监听输入数据流的Realtime节、监控Historical节点的Coordinator节点、接收来自外部客户端的查询和将查询转发到Realtime和Historical节点的Broker节点、负责索引服务的Indexer节点。

查询操作中数据流和各个节点的关系如下图所示:



如下图是Druid集群的管理层架构,该图展示了相关节点和集群管理所依赖的其他组件(如负责服务发现的ZooKeeper集群)的关系:



一、Druid简介
二、Druid架构组成及相关依赖
三、Druid集群配置
四、Druid集群启动
五、Druid查询
六、后记

一、Druid简介

Druid是一个为大型冷数据集上实时探索查询而设计的开源数据分析和存储系统,提供极具成本效益并且永远在线的实时数据摄取和任意数据处理。

主要特性:

为分析而设计——Druid是为OLAP工作流的探索性分析而构建。它支持各种filter、aggregator和查询类型,并为添加新功能提供了一个框架。用户已经利用Druid的基础设施开发了高级K查询和直方图功能。

交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为Druid的查询延时通过只读取和扫描有必要的元素被优化。Aggregate和filter没有坐等结果。

高可用性——Druid是用来支持需要一直在线的SaaS的实现。你的数据在系统更新时依然可用、可查询。规模的扩大和缩小不会造成数据丢失。

可伸缩——现有的Druid部署每天处理数十亿事件和TB级数据。Druid被设计成PB级别。

就系统而言,Druid功能位于PowerDrill和Dremel之间。它实现几乎所有Dremel提供的工具(Dremel处理任意嵌套数据结构,而Druid只允许一个基于数组的嵌套级别)并且从PowerDrill吸收一些有趣的数据格式和压缩方法。

Druid对于需要实时单一、海量数据流摄取产品非常适合。特别是如果你面向无停机操作时,如果你对查询查询的灵活性和原始数据访问要求,高于对速度和无停机操作,Druid可能不是正确的解决方案。在谈到查询速度时候,很有必要澄清“快速”的意思是:Druid是完全有可能在6TB的数据集上实现秒级查询。

二、Druid架构组成及其他依赖



2.1OverlordNode(IndexingService)

Overlord会形成一个加载批处理和实时数据到系统中的集群,同时会对存储在系统中的数据变更(也称为索引服务)做出响应。另外,还包含了MiddleManager和Peons,一个Peon负责执行单个task,而MiddleManager负责管理这些Peons。

2.2CoordinatorNode

监控Historical节点组,以确保数据可用、可复制,并且在一般的“最佳”配置。它们通过从MySQL读取数据段的元数据信息,来决定哪些数据段应该在集群中被加载,使用Zookeeper来确定哪个Historical节点存在,并且创建Zookeeper条目告诉Historical节点加载和删除新数据段。

2.3HistoricalNode

是对“historical”数据(非实时)进行处理存储和查询的地方。Historical节点响应从Broker节点发来的查询,并将结果返回给broker节点。它们在Zookeeper的管理下提供服务,并使用Zookeeper监视信号加载或删除新数据段。

2.4BrokerNode

接收来自外部客户端的查询,并将这些查询转发到Realtime和Historical节点。当Broker节点收到结果,它们将合并这些结果并将它们返回给调用者。由于了解拓扑,Broker节点使用Zookeeper来确定哪些Realtime和Historical节点的存在。

2.5Real-timeNode

实时摄取数据,它们负责监听输入数据流并让其在内部的Druid系统立即获取,Realtime节点同样只响应broker节点的查询请求,返回查询结果到broker节点。旧数据会被从Realtime节点转存至Historical节点。

2.6ZooKeeper

为集群服务发现和维持当前的数据拓扑而服务;

2.7MySQL

用来维持系统服务所需的数据段的元数据;

2.8DeepStorage

保存“冷数据”,可以使用HDFS。



三、Druid集群配置

3.1环境信息

我这里有两台机器,node1有32G内存,上面部署了HistoticalNode和CoordinatorNode;node2有72G内存,上面部署了其他四个服务。



3.2通用配置(CommonConfiguration)

##创建MySQL数据库

CREATEDATABASE`druid`DEFAULTCHARACTERSETutf8COLLATEutf8_general_ci;
grantallondruid.*todruid@’%’identifiedby‘druid1234′WITHGRANTOPTION;
flushprivileges;

##配置文件

cd$DRUID_HOME/config/_common
vicommon.runtime.properties(所有节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
##使用Mysql存储元数据
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight","io.druid.extensions:mysql-metadata-storage"]

##zookeeper
druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181

##Mysql配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd1234

##配置deepstorage到HDFS
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://cdh5/tmp/druid/storage

##配置查询缓存,暂用本地,可配置memcached
druid.cache.type=local
druid.cache.sizeInBytes=10737418240

##配置监控
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]

##配置Indexingservice的名字
druid.selectors.indexing.serviceName=druid/overlord

##
druid.emitter=logging

3.3OverlordNode(IndexingService)

在运行OverlordNode节点上:

cd$DRUID_HOME/config/overlord
viruntime.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
druid.host=node2
druid.port=8090
druid.service=druid/overlord

#Onlyrequiredifyouareautoscalingmiddlemanagers
druid.indexer.autoscale.doAutoscale=true
druid.indexer.autoscale.strategy=ec2
druid.indexer.autoscale.workerIdleTimeout=PT90m
druid.indexer.autoscale.terminatePeriod=PT5M
druid.indexer.autoscale.workerVersion=0

#Uploadalltasklogstodeepstorage
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

#Runinremotemode
druid.indexer.runner.type=remote
druid.indexer.runner.minWorkerVersion=0

#Storealltaskstateinthemetadatastorage
druid.indexer.storage.type=metadata

3.4MiddleManagerNode

在运行MiddleManagerNode节点上:
cd$DRUID_HOME/config/middleManager
viruntime.properties

1
2
3
4
5
6
7
8
9
10
druid.host=node2
druid.port=8091
druid.service=druid/middlemanager

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

#Resourcesforpeons
druid.indexer.runner.javaOpts=-server-Xmx2g-XX:+UseG1GC-XX:MaxGCPauseMillis=100-XX:+PrintGCDetails-XX:+PrintGCTimeStamps
druid.indexer.task.baseTaskDir=/tmp/persistent/task/

3.5CoordinatorNode

在运行CoordinatorNode节点上:
cd$DRUID_HOME/config/coordinator
viruntime.properties

1
2
3
4
5
druid.host=node1
druid.port=8081
druid.service=coordinator

druid.coordinator.startDelay=PT5M

3.6HistoricalNode

在运行HistoricalNode节点上:
cd$DRUID_HOME/config/historical
viruntime.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
druid.host=node1
druid.port=8082
druid.service=druid/historical

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=9

druid.server.http.numThreads=9
druid.server.maxSize=300000000000

druid.segmentCache.locations=[{"path":"/tmp/druid/indexCache","maxSize":300000000000}]

druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor","com.metamx.metrics.JvmMonitor"]

3.7BrokerNode

在运行BrokerNode节点上:
cd$DRUID_HOME/config/broker
viruntime.properties

1
2
3
4
5
6
7
8
9
10
11
druid.host=node2
druid.port=8092
druid.service=druid/broker

druid.broker.http.numConnections=20
druid.broker.http.readTimeout=PT5M

druid.processing.buffer.sizeBytes=2147483647
druid.processing.numThreads=11

druid.server.http.numThreads=20

3.8Real-timeNode

在运行Real-timeNode节点上:
cd$DRUID_HOME/config/realtime
viruntime.properties

1
2
3
4
5
6
7
8
9
10
11
druid.host=node2
druid.port=8093
druid.service=druid/realtime

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=5

#Overrideemittertoprintlogsabouteventsingested,rejected,etc
druid.emitter=logging

druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor","com.metamx.metrics.JvmMonitor"]

四、Druid集群启动

首次启动时候,可以遵循下面的启动顺序。

4.1BrokerNode

cd$DRUID_HOME/
cprun_druid_server.shrun_broker.sh
virun_broker.sh

替换以下内容:

1
2
3
4
5
6
7
SERVER_TYPE=broker

#startprocess
JAVA_ARGS="${JAVA_ARGS}-Xmx10g-Xms5g-XX:NewSize=2g-XX:MaxNewSize=2g-XX:MaxDirectMemorySize=24g-XX:+UseConcMarkSweepGC-XX:+PrintGCDetails-XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS}-Duser.timezone=GMT+8-Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS}-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS}-Dcom.sun.management.jmxremote.port=17071-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false-Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行./run_broker.sh启动BrokerNode:



4.2HistoricalNode

cd$DRUID_HOME/
cprun_druid_server.shrun_historical.sh

virun_historical.sh

替换以下内容:

1
2
3
4
5
6
7
SERVER_TYPE=historical

#startprocess
JAVA_ARGS="${JAVA_ARGS}-Xmx10g-Xms10g-XX:NewSize=2g-XX:MaxNewSize=2g-XX:MaxDirectMemorySize=16g-XX:+UseConcMarkSweepGC-XX:+PrintGCDetails-XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS}-Duser.timezone=GMT+8-Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS}-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS}-Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行命令./run_historical.sh启动HistoricalNode:



4.3CoordinatorNode

cd$DRUID_HOME/
cprun_druid_server.shrun_coordinator.sh
virun_coordinator.sh

替换以下内容:

1
2
3
4
5
6
7
SERVER_TYPE=coordinator

#startprocess
JAVA_ARGS="${JAVA_ARGS}-Xmx10g-Xms10g-XX:NewSize=512m-XX:MaxNewSize=512m-XX:+UseG1GC-XX:+PrintGCDetails-XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS}-Duser.timezone=GMT+8-Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS}-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS}-Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行命令./run_coordinator.sh启动CoordinatorNode.

4.4MiddleManager

cd$DRUID_HOME/
cprun_druid_server.shrun_middleManager.sh
virun_middleManager.sh

替换以下内容:

1
2
3
4
5
SERVER_TYPE=middleManager
#startprocess
JAVA_ARGS="${JAVA_ARGS}-Xmx64m-Xms64m-XX:+UseConcMarkSweepGC-XX:+PrintGCDetails-XX:+PrintGCTimeStamps-Duser.timezone=GMT+8-Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS}-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Djava.io.tmpdir=/tmp/druid-Ddruid.extensions.localR
epository=${MAVEN_DIR}"
执行命令./run_middleManager.sh启动MiddleManagerNode。

4.5OverlordNode

cd$DRUID_HOME/
cprun_druid_server.shrun_overlord.sh
virun_overlord.sh

替换以下内容:

1
2
3
4
5
6
SERVER_TYPE=overlord
#startprocess
JAVA_ARGS="${JAVA_ARGS}-Xmx4g-Xms4g-XX:NewSize=256m-XX:MaxNewSize=256m-XX:+UseConcMarkSweepGC-XX:+PrintGCDetails-XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS}-Duser.timezone=GMT+8-Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS}-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS}-Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行命令./run_overlord.sh启动OverlordNode:



4.6Real-timeNode

cd$DRUID_HOME/
cprun_druid_server.shrun_realtime.sh
virun_realtime.sh
替换以下内容:

1
2
3
4
5
6
7
8
9
10
11
SERVER_TYPE=realtime

#startprocess
JAVA_ARGS="${JAVA_ARGS}-Xmx13g-Xms13g-XX:NewSize=2g-XX:MaxNewSize=2g-XX:MaxDirectMemorySize=9g-XX:+UseConcMarkSweepGC-XX:+PrintGCDetails-
XX:+PrintGCTimeStamps-XX:+HeapDumpOnOutOfMemoryError"
JAVA_ARGS="${JAVA_ARGS}-Duser.timezone=GMT+8-Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS}-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec"
JAVA_ARGS="${JAVA_ARGS}-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS}-Dcom.sun.management.jmxremote.port=17072-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremot
e.ssl=false"
JAVA_ARGS="${JAVA_ARGS}-Ddruid.extensions.localRepository=${MAVEN_DIR}"
##特别需要注意参数:

-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec

启动RealTimeNode需要指定一个realtime数据源的配置文件,本文中使用example提供的wikipedia_realtime.spec,启动后,该数据源从irc.wikimedia.org获取实时数据。

关于RealTimeNode的配置,后续文章将会详细介绍。

执行命令./run_realtime.sh启动RealTimeNode。

五、Druid查询

第四部分中启动RealTimeNode时候使用了例子中自带的配置文件wikipedia_realtime.spec,启动后,该RealTimeNode会从irc.wikimedia.org获取实时数据,本章将以该数据源为例,学习几种最常见的查询。

5.1select查询

首先编辑查询配置文件select_query.json

1
2
3
4
5
6
7
8
9
10
11
{
"queryType":"select",
"dataSource":"wikipedia",
"dimensions":[],
"metrics":[],
"granularity":"all",
"intervals":[
"2015-11-01/2015-11-20"
],
"pagingSpec":{"pagingIdentifiers":{},"threshold":10}
}
该配置文件的含义是从数据源”wikipedia”进行select查询所有列,时间区间为2015-11-01/2015-11-20,每10条记录一个分页。

执行命令查询:

curl-XPOST‘http://node2:8093/druid/v2/?pretty’-H‘content-type:application/json’-d@select_query.json

瞬间返回结果:



5.2基于时间序列的查询Timeseriesquery

编辑查询配置文件timeseries.json

1
2
3
4
5
6
7
8
9
10
{
"queryType":"timeseries",
"dataSource":"wikipedia",
"intervals":["2010-01-01/2020-01-01"],
"granularity":"minute",
"aggregations":[
{"type":"longSum","fieldName":"count","name":"edit_count"},
{"type":"doubleSum","fieldName":"added","name":"chars_added"}
]
}
该配置文件的含义是:从数据源”wikipedia”中进行时间序列查询,区间为2010-01-01/2020-01-01,按分钟汇总结果,汇总字段为count和added;

执行查询命令:

curl-XPOST‘http://node2:8093/druid/v2/?pretty’-H‘content-type:application/json’-d@timeseries.json

同样瞬间返回结果:



5.3TopN查询

编辑查询文件topn.json

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"queryType":"topN",
"dataSource":"wikipedia",
"granularity":"all",
"dimension":"page",
"metric":"edit_count",
"threshold":10,
"aggregations":[
{"type":"longSum","fieldName":"count","name":"edit_count"}
],
"filter":{"type":"selector","dimension":"country","value":"UnitedStates"},
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}
该文件含义是:从数据源”wikipedia”进行TopN查询,其中N=10,维度为page,指标为edit_count,也就是,在page维度上将edit_count汇总后取Top10.

执行查询命令:

curl-XPOST‘http://node2:8093/druid/v2/?pretty’-H‘content-type:application/json’-d@topn.json

结果为:



六、后记

Druid目前已经有很多公司用于实时计算和实时OLAP,而且效果很好。虽然它的配置和查询都比较复杂和繁琐,但如果是真正基于海量数据的实时OLAP,它的威力还是很强大的。我将持续学习和分享Druid的相关技术,验证它在海量数据实时OLAP上的效果,敬请关注我的博客。

参考文章:
http://druid.iohttp://www.csdn.net/article/2014-10-30/2822381/2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: