您的位置:首页 > 数据库 > Mongodb

vertica-->kafka-->mongodb数据流

2016-08-04 18:29 495 查看
此连接件为confluent修改过的jar包,这里不提供下载(涉及公司机密!!)

Kafka Connnect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。



由于这里是kafka到mongo,所以这里使用Sink Connector

首先安装kafka
1,Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
Kafka拓扑结构:



Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition

Parition是物理上的概念,每个Topic包含一个或多个Partition.

Producer

负责发布消息到Kafka broker

Consumer

消息消费者,向Kafka broker读取消息的客户端。

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

安装zookeeper

配置环境变量,/etc/profile添加以下内容:

[root@dba06 kafka]# export ZOOKEEPER_HOME=/opt/app/kafka/zookeeper-3.4.6

[root@dba06 kafka]# export PATH=$PATH:$ZOOKEEPER_HOME/bin

[root@dba06 kafka]# cd zookeeper-3.4.6/conf/

[root@dba06 conf]# ls

configuration.xsl log4j.properties zoo_sample.cfg

[root@dba06 conf]# cat zoo_sample.cfg | grep -v '#' > zoo.cfg

[root@dba06 conf]# cat zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/tmp/zookeeper

clientPort=2181

[root@dba06 conf]# ../bin/zkServer.sh start

JMX enabled by default

Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

查看是否启动

[root@dba06 conf]# ../bin/zkServer.sh status

JMX enabled by default

Using config: /opt/app/kafka/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: standalone

Server启动之后, 就可以启动client连接server了, 执行脚本:

[root@dba06 conf]# ../bin/zkCli.sh -server localhost:2181

安装kafka

安装kafka server之前需要单独安装zookeeper server,而且需要修改config/server.properties里面的IP信息

[root@dba06 kafka]# cd kafka_2.11-0.9.0.0

[root@dba06 kafka_2.11-0.9.0.0]# cd config/

[root@dba06 config]# vi server.properties

zookeeper.connect=localhost:2181

这里需要修改默认zookeeper.properties配置

[root@dba06 config]# vi zookeeper.properties

保持默认设置

先启动zookeeper,启动前先kill掉之前的zkServer.sh启动的zookeeper服务

[root@dba06 bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties &

启动kafka服务

[root@dba06 bin]# ./kafka-server-start.sh ../config/server.properties &

查看kafka进程是否启动:

[root@dba06 bin]# find /. -name 'java'

重新安装Java

[root@dba06 bin]# yum install java-1.7.0

[root@dba06 bin]# yum install java-1.7.0-openjdk java-1.7.0-openjdk-devel

配置环境变量

[root@dba06 bin]# vi ~/.bash_profile

export PATH

export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre

export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$PATH:$JAVA_HOME/bin

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/libjsig.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server/libjvm.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111.x86_64/jre/lib/amd64

[root@dba06 bin]# source ~/.bash_profile

[root@dba06 bin]# jps

835 Kafka

803 QuorumPeerMain

1679 Jps

咱们先看看vertica,推数据到kafka

dbadmin=> select version();

version

------------------------------------

Vertica Analytic Database v7.2.3-0

(1 row)

dbadmin=> \q

这里采用7.2.3版本

dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"EUTIME"||'"}' USING PARAMETERS brokers='172.16.20.207:9092',topic='fafatest') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;

partition | key | message | failure_reason

-----------+-----+---------+----------------

(0 rows)

说明,这里无需在对方创建topic,只需对方有zookeeper进程和kafka进程即可。

报错详解:

dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER (PARTITION BEST) FROM tmp.DP_TRADE_PVCOUNT ;

ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:187], error code: 0, message: Error reading topic metadata for topic fafa01: Broker: Leader not available

dbadmin=> SELECT KafkaExport(0, '1', '{"EUTIME":"'||"STATDATE"||'"}' USING PARAMETERS brokers='172.16.57.55:9092',topic='fafa01') OVER () FROM tmp.DP_TRADE_PVCOUNT ;

ERROR 5861: Error calling processPartition() in User Function KafkaExport at [src/KafkaExport.cpp:323], error code: 0, message: Exception while processing partition row: [0] : [Fatal kafka error: T2:9092/0: Failed to resolve 'T2:9092': Temporary failure in
name resolution]

ERROR 5861问题是因为在双方的/etc/hosts下面要配置互相的ip,例如此时:

172.16.57.55 T2

172.16.57.208 bd-dev-vertica2-208 localhost

在kafka上消费测试一下

[root@dba06 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafa01

{"EUTIME":"2016-04-30"}

{"EUTIME":"2016-04-30"}

{"EUTIME":"2016-05-01"}

{"EUTIME":"2016-05-02"}

{"EUTIME":"2016-05-02"}

{"EUTIME":"2016-05-02"}

....................

....................

可以看到json格式数据已经消费。

下面尝试kafka到mongo连接

下载kafka到mongo连接件,这里下载的连接件需要再使用confluect上面的kafka才能使用
http://www.confluent.io/product/connectors
这里使用改写的单向的版本,使用普通的kafka

[root@dba06 libs]# rz

?z waiting to receive.**B0100000023be50



connect-mongodb-1.0-jar-with-dependencies.jar

将jar报放在lib下面

将配置文件放在config下面

[root@dba06 ~]# cd /opt/app/kafka/kafka_2.11-0.9.0.0/config/

connect-mongo160-sink.properties

connect-standalone.properties

配置kafka到mongo连接件

[root@T2 config]# vi connect-mongo160-sink.properties

name=mongodb-sink-connector

connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector

tasks.max=1

host=172.16.57.160

port=12001

bulk.size=100

mongodb.database=backbone

mongodb.authorized.database=backbone

skip.message.on.error=true

mongodb.collections=test

mongodb.user=backbone

mongodb.password=Password$1

topics=fafa01

启动服务,连接服务

这里先不管格式将connect-standalone.properties

[root@T2 bin]# vi /opt/app/software/kafka_2.11-0.10.0.0/config/connect-standalone.properties

key.converter.schemas.enable=false

value.converter.schemas.enable=false

暂时关闭格式控制

[root@T2 bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-mongo160-sink.properties

查看mongo端

mongos> db.test.find()

{ "_id" : NumberLong(2), "message" : "great", "EUTIME" : "2015-10-28" }

{ "_id" : NumberLong(3), "haoshibu" : "4", "EUTIME" : "2015-10-29" }

{ "_id" : NumberLong(4), "haoshibu" : "9", "EUTIME" : "2015-10-31" }

{ "_id" : NumberLong(5), "haoshibu" : "6", "EUTIME" : "2015-11-12" }

{ "_id" : NumberLong(6), "haoshibu" : "10", "EUTIME" : "2015-11-14" }

{ "_id" : NumberLong(7), "haoshibu" : "2", "EUTIME" : "2015-11-17" }

{ "_id" : NumberLong(8), "haoshibu" : "3", "EUTIME" : "2015-11-18" }

{ "_id" : NumberLong(9), "haoshibu" : "1", "EUTIME" : "2015-11-19" }

{ "_id" : NumberLong(10), "haoshibu" : "7", "EUTIME" : "2015-11-24" }

{ "_id" : NumberLong(11), "haoshibu" : "8", "EUTIME" : "2015-11-27" }

{ "_id" : NumberLong(12), "haoshibu" : "5", "EUTIME" : "2015-12-03" }

{ "_id" : NumberLong(13), "haoshibu" : "4", "EUTIME" : "2015-12-09" }

{ "_id" : NumberLong(14), "haoshibu" : "9", "EUTIME" : "2015-12-10" }

{ "_id" : NumberLong(15), "haoshibu" : "6", "EUTIME" : "2015-12-11" }

{ "_id" : NumberLong(16), "haoshibu" : "10", "EUTIME" : "2015-12-18" }

{ "_id" : NumberLong(17), "haoshibu" : "2", "EUTIME" : "2015-12-20" }

{ "_id" : NumberLong(18), "haoshibu" : "3", "EUTIME" : "2015-12-26" }

{ "_id" : NumberLong(19), "haoshibu" : "1", "EUTIME" : "2015-12-27" }

{ "_id" : NumberLong(20), "haoshibu" : "7", "EUTIME" : "2016-01-01" }

{ "_id" : NumberLong(21), "haoshibu" : "8", "EUTIME" : "2016-01-03" }

成功!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息