vertica-->kafka-->mongodb数据流
2016-08-04 18:29
495 查看
此连接件为confluent修改过的jar包,这里不提供下载(涉及公司机密!!)
Kafka Connnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。
由于这里是kafka到mongo,所以这里使用Sink Connector
首先安装kafka
1,Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Kafka拓扑结构:
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
安装zookeeper
配置环境变量,/etc/profile添加以下内容:
[root@dba06 kafka]# export ZOOKEEPER_HOME=/opt/app/kafka/zookeeper-3.4.6
[root@dba06 kafka]# export PATH=$PATH:$ZOOKEEPER_HOME/bin
[root@dba06 kafka]# cd zookeeper-3.4.6/conf/
[root@dba06 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@dba06 conf]# cat zoo_sample.cfg | grep -v '#' > zoo.cfg
[root@dba06 conf]# cat zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
[root@dba06 conf]# ../bin/zkServer.sh start
JMX enabled by default
Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
查看是否启动
[root@dba06 conf]# ../bin/zkServer.sh status
JMX enabled by default
Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
Server启动之后, 就可以启动client连接server了, 执行脚本:
[root@dba06 conf]# ../bin/zkCli.sh -server localhost:2181
安装kafka
安装kafka server之前需要单独安装zookeeper server,而且需要修改config/server.properties里面的IP信息
[root@dba06 kafka]# cd kafka_2.11-0.9.0.0
[root@dba06 kafka_2.11-0.9.0.0]# cd config/
[root@dba06 config]# vi server.properties
zookeeper.connect=localhost:2181
这里需要修改默认zookeeper.properties配置
[root@dba06 config]# vi zookeeper.properties
保持默认设置
先启动zookeeper,启动前先kill掉之前的zkServer.sh启动的zookeeper服务
[root@dba06 bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties &
启动kafka服务
[root@dba06 bin]# ./kafka-server-start.sh ../config/server.properties &
查看kafka进程是否启动:
[root@dba06 bin]# find /. -name 'java'
重新安装Java
[root@dba06 bin]# yum install java-1.7.0
[root@dba06 bin]# yum install java-1.7.0-openjdk java-1.7.0-openjdk-devel
配置环境变量
[root@dba06 bin]# vi ~/.bash_profile
export PATH
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/libjsig.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server/libjvm.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64
[root@dba06 bin]# source ~/.bash_profile
[root@dba06 bin]# jps
835 Kafka
803 QuorumPeerMain
1679 Jps
咱们先看看vertica,推数据到kafka
dbadmin=> select version();
version
------------------------------------
Vertica Analytic Database v7.2.3-0
(1 row)
dbadmin=> \q
这里采用7.2.3版本
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"EUTIME"||'"}' USING PARAMETERS brokers='172.16.20.207:9092',topic='fafatest') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;
partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)
说明,这里无需在对方创建topic,只需对方有zookeeper进程和kafka进程即可。
报错详解:
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;
ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:187], error code: 0, message: Error reading topic metadata for topic fafa01: Broker: Leader not available
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER () FROM tmp.DP_TRADE_PVCOUNT ;
ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:323], error code: 0, message: Exception while processing partition row: [0] : [Fatal kafka error: T2:9092/0: Failed to resolve 'T2:9092': Temporary failure in
name resolution]
ERROR 5861问题是因为在双方的/etc/hosts下面要配置互相的ip,例如此时:
172.16.57.55 T2
172.16.57.208 bd-dev-vertica2-208 localhost
在kafka上消费测试一下
[root@dba06 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafa01
{"EUTIME":"2016-04-30"}
{"EUTIME":"2016-04-30"}
{"EUTIME":"2016-05-01"}
{"EUTIME":"2016-05-02"}
{"EUTIME":"2016-05-02"}
{"EUTIME":"2016-05-02"}
....................
....................
可以看到json格式数据已经消费。
下面尝试kafka到mongo连接
下载kafka到mongo连接件,这里下载的连接件需要再使用confluect上面的kafka才能使用
http://www.confluent.io/product/connectors
这里使用改写的单向的版本,使用普通的kafka
[root@dba06 libs]# rz
?z waiting to receive.**B0100000023be50
即
connect-mongodb-1.0-jar-with-dependencies.jar
将jar报放在lib下面
将配置文件放在config下面
[root@dba06 ~]# cd /opt/app/kafka/kafka_2.11-0.9.0.0/config/
connect-mongo160-sink.properties
connect-standalone.properties
配置kafka到mongo连接件
[root@T2 config]# vi connect-mongo160-sink.properties
name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=1
host=172.16.57.160
port=12001
bulk.size=100
mongodb.database=backbone
mongodb.authorized.database=backbone
skip.message.on.error=true
mongodb.collections=test
mongodb.user=backbone
mongodb.password=Password$1
topics=fafa01
启动服务,连接服务
这里先不管格式将connect-standalone.properties
[root@T2 bin]# vi /opt/app/software/kafka_2.11-0.10.0.0/config/connect-standalone.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
暂时关闭格式控制
[root@T2 bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-mongo160-sink.properties
查看mongo端
mongos> db.test.find()
{ "_id" : NumberLong(2), "message" : "great", "EUTIME" : "2015-10-28" }
{ "_id" : NumberLong(3), "haoshibu" : "4", "EUTIME" : "2015-10-29" }
{ "_id" : NumberLong(4), "haoshibu" : "9", "EUTIME" : "2015-10-31" }
{ "_id" : NumberLong(5), "haoshibu" : "6", "EUTIME" : "2015-11-12" }
{ "_id" : NumberLong(6), "haoshibu" : "10", "EUTIME" : "2015-11-14" }
{ "_id" : NumberLong(7), "haoshibu" : "2", "EUTIME" : "2015-11-17" }
{ "_id" : NumberLong(8), "haoshibu" : "3", "EUTIME" : "2015-11-18" }
{ "_id" : NumberLong(9), "haoshibu" : "1", "EUTIME" : "2015-11-19" }
{ "_id" : NumberLong(10), "haoshibu" : "7", "EUTIME" : "2015-11-24" }
{ "_id" : NumberLong(11), "haoshibu" : "8", "EUTIME" : "2015-11-27" }
{ "_id" : NumberLong(12), "haoshibu" : "5", "EUTIME" : "2015-12-03" }
{ "_id" : NumberLong(13), "haoshibu" : "4", "EUTIME" : "2015-12-09" }
{ "_id" : NumberLong(14), "haoshibu" : "9", "EUTIME" : "2015-12-10" }
{ "_id" : NumberLong(15), "haoshibu" : "6", "EUTIME" : "2015-12-11" }
{ "_id" : NumberLong(16), "haoshibu" : "10", "EUTIME" : "2015-12-18" }
{ "_id" : NumberLong(17), "haoshibu" : "2", "EUTIME" : "2015-12-20" }
{ "_id" : NumberLong(18), "haoshibu" : "3", "EUTIME" : "2015-12-26" }
{ "_id" : NumberLong(19), "haoshibu" : "1", "EUTIME" : "2015-12-27" }
{ "_id" : NumberLong(20), "haoshibu" : "7", "EUTIME" : "2016-01-01" }
{ "_id" : NumberLong(21), "haoshibu" : "8", "EUTIME" : "2016-01-03" }
成功!
Kafka Connnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。
由于这里是kafka到mongo,所以这里使用Sink Connector
首先安装kafka
1,Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Kafka拓扑结构:
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
安装zookeeper
配置环境变量,/etc/profile添加以下内容:
[root@dba06 kafka]# export ZOOKEEPER_HOME=/opt/app/kafka/zookeeper-3.4.6
[root@dba06 kafka]# export PATH=$PATH:$ZOOKEEPER_HOME/bin
[root@dba06 kafka]# cd zookeeper-3.4.6/conf/
[root@dba06 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@dba06 conf]# cat zoo_sample.cfg | grep -v '#' > zoo.cfg
[root@dba06 conf]# cat zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
[root@dba06 conf]# ../bin/zkServer.sh start
JMX enabled by default
Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
查看是否启动
[root@dba06 conf]# ../bin/zkServer.sh status
JMX enabled by default
Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
Server启动之后, 就可以启动client连接server了, 执行脚本:
[root@dba06 conf]# ../bin/zkCli.sh -server localhost:2181
安装kafka
安装kafka server之前需要单独安装zookeeper server,而且需要修改config/server.properties里面的IP信息
[root@dba06 kafka]# cd kafka_2.11-0.9.0.0
[root@dba06 kafka_2.11-0.9.0.0]# cd config/
[root@dba06 config]# vi server.properties
zookeeper.connect=localhost:2181
这里需要修改默认zookeeper.properties配置
[root@dba06 config]# vi zookeeper.properties
保持默认设置
先启动zookeeper,启动前先kill掉之前的zkServer.sh启动的zookeeper服务
[root@dba06 bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties &
启动kafka服务
[root@dba06 bin]# ./kafka-server-start.sh ../config/server.properties &
查看kafka进程是否启动:
[root@dba06 bin]# find /. -name 'java'
重新安装Java
[root@dba06 bin]# yum install java-1.7.0
[root@dba06 bin]# yum install java-1.7.0-openjdk java-1.7.0-openjdk-devel
配置环境变量
[root@dba06 bin]# vi ~/.bash_profile
export PATH
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/libjsig.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server/libjvm.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64
[root@dba06 bin]# source ~/.bash_profile
[root@dba06 bin]# jps
835 Kafka
803 QuorumPeerMain
1679 Jps
咱们先看看vertica,推数据到kafka
dbadmin=> select version();
version
------------------------------------
Vertica Analytic Database v7.2.3-0
(1 row)
dbadmin=> \q
这里采用7.2.3版本
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"EUTIME"||'"}' USING PARAMETERS brokers='172.16.20.207:9092',topic='fafatest') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;
partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)
说明,这里无需在对方创建topic,只需对方有zookeeper进程和kafka进程即可。
报错详解:
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;
ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:187], error code: 0, message: Error reading topic metadata for topic fafa01: Broker: Leader not available
dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER () FROM tmp.DP_TRADE_PVCOUNT ;
ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:323], error code: 0, message: Exception while processing partition row: [0] : [Fatal kafka error: T2:9092/0: Failed to resolve 'T2:9092': Temporary failure in
name resolution]
ERROR 5861问题是因为在双方的/etc/hosts下面要配置互相的ip,例如此时:
172.16.57.55 T2
172.16.57.208 bd-dev-vertica2-208 localhost
在kafka上消费测试一下
[root@dba06 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafa01
{"EUTIME":"2016-04-30"}
{"EUTIME":"2016-04-30"}
{"EUTIME":"2016-05-01"}
{"EUTIME":"2016-05-02"}
{"EUTIME":"2016-05-02"}
{"EUTIME":"2016-05-02"}
....................
....................
可以看到json格式数据已经消费。
下面尝试kafka到mongo连接
下载kafka到mongo连接件,这里下载的连接件需要再使用confluect上面的kafka才能使用
http://www.confluent.io/product/connectors
这里使用改写的单向的版本,使用普通的kafka
[root@dba06 libs]# rz
?z waiting to receive.**B0100000023be50
即
connect-mongodb-1.0-jar-with-dependencies.jar
将jar报放在lib下面
将配置文件放在config下面
[root@dba06 ~]# cd /opt/app/kafka/kafka_2.11-0.9.0.0/config/
connect-mongo160-sink.properties
connect-standalone.properties
配置kafka到mongo连接件
[root@T2 config]# vi connect-mongo160-sink.properties
name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=1
host=172.16.57.160
port=12001
bulk.size=100
mongodb.database=backbone
mongodb.authorized.database=backbone
skip.message.on.error=true
mongodb.collections=test
mongodb.user=backbone
mongodb.password=Password$1
topics=fafa01
启动服务,连接服务
这里先不管格式将connect-standalone.properties
[root@T2 bin]# vi /opt/app/software/kafka_2.11-0.10.0.0/config/connect-standalone.properties
key.converter.schemas.enable=false
value.converter.schemas.enable=false
暂时关闭格式控制
[root@T2 bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-mongo160-sink.properties
查看mongo端
mongos> db.test.find()
{ "_id" : NumberLong(2), "message" : "great", "EUTIME" : "2015-10-28" }
{ "_id" : NumberLong(3), "haoshibu" : "4", "EUTIME" : "2015-10-29" }
{ "_id" : NumberLong(4), "haoshibu" : "9", "EUTIME" : "2015-10-31" }
{ "_id" : NumberLong(5), "haoshibu" : "6", "EUTIME" : "2015-11-12" }
{ "_id" : NumberLong(6), "haoshibu" : "10", "EUTIME" : "2015-11-14" }
{ "_id" : NumberLong(7), "haoshibu" : "2", "EUTIME" : "2015-11-17" }
{ "_id" : NumberLong(8), "haoshibu" : "3", "EUTIME" : "2015-11-18" }
{ "_id" : NumberLong(9), "haoshibu" : "1", "EUTIME" : "2015-11-19" }
{ "_id" : NumberLong(10), "haoshibu" : "7", "EUTIME" : "2015-11-24" }
{ "_id" : NumberLong(11), "haoshibu" : "8", "EUTIME" : "2015-11-27" }
{ "_id" : NumberLong(12), "haoshibu" : "5", "EUTIME" : "2015-12-03" }
{ "_id" : NumberLong(13), "haoshibu" : "4", "EUTIME" : "2015-12-09" }
{ "_id" : NumberLong(14), "haoshibu" : "9", "EUTIME" : "2015-12-10" }
{ "_id" : NumberLong(15), "haoshibu" : "6", "EUTIME" : "2015-12-11" }
{ "_id" : NumberLong(16), "haoshibu" : "10", "EUTIME" : "2015-12-18" }
{ "_id" : NumberLong(17), "haoshibu" : "2", "EUTIME" : "2015-12-20" }
{ "_id" : NumberLong(18), "haoshibu" : "3", "EUTIME" : "2015-12-26" }
{ "_id" : NumberLong(19), "haoshibu" : "1", "EUTIME" : "2015-12-27" }
{ "_id" : NumberLong(20), "haoshibu" : "7", "EUTIME" : "2016-01-01" }
{ "_id" : NumberLong(21), "haoshibu" : "8", "EUTIME" : "2016-01-03" }
成功!
相关文章推荐
- kafka-->storm-->mongodb
- Kafka->Spark Streaming->mongodb
- Logstash/Filebeat->Logstash->Kafka->Spring-kafka->MongoDb->Spark日志收集和处理
- ELK学习6_Kafka->Logstash->Elasticsearch数据流操作
- Kafka->Mongodb->Es
- MongoDB和数据流:使用MongoDB作为Kafka消费者
- Vertica的这些事<十一>—— Vertica 管理
- kafka->logstash->es
- flume+kafka+storm+redis+mongodb日志优化
- AngularJS+Satellizer+Node.js+MongoDB->目录
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- Vertica的这些事<十六>—— vertica备份与恢复
- android内嵌页中使用<img>后面的图片是一大串数据流形式显示
- MongoDB学习笔记-->位置查询
- Spark streaming + Kafka 流式数据处理,结果存储至MongoDB、Solr、Neo4j(自用)
- 【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建<转>
- Nosql :MongoDB一些基本操作<二>
- mongodb 实现group by 多个key having count(*) >1
- NOSQL之mongodb简介及安装 for windows<一>
- C#成魔之路<10>文件及数据流技术(3)