您的位置:首页 > 编程语言 > Java开发

kafka2.9.2的伪分布式集群安装和demo(java api)测试

2016-07-06 09:21 579 查看


1、什么是kafka?

  kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ。Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。

  kafka目前支持多种客户端语言:java,python,c++,php等等。

  kafka集群的简要图解如下,producer写入消息,consumer读取消息

  


  


1.1、kafka设计目标

高吞吐量是其核心设计之一。

数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。

zero-copy:减少IO操作步骤。

支持数据批量发送和拉取。

支持数据压缩。

Topic划分为多个partition,提高并行处理能力。


1.2、kafka名词解释和工作方式

Producer :消息生产者,就是向kafka broker发消息的客户端。

Consumer :消息消费者,向kafka broker取消息的客户端

Topic :可以理解为一个队列。

Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。

Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka


1.3、kafak系统扩展性

kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。

而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。


1.4、kafak和zookeeper的关系

Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.

Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.

Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.


2、kafka的官方网站在哪里?

  http://kafka.apache.org/

  


3、在哪里下载?需要哪些组件的支持?

  kafka2.9.2在下面的地址可以下载:

  https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

  需要zookeeper的支持,相关安装及下载,可以参考这篇文章《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署


4、如何安装?


4.1、解压kafka_2.9.2-0.8.1.1.tgz

  本文中解压到/home/hadoop目录下
root@m1:/home/hadoop/kafka_2.9.2-0.8.1.1#pwd

/home/hadoop/kafka_2.9.2-0.8.1.1

[/code]


4.2、修改server.properties配置文件。

  这里使用zookeeper的部分,请参考可以参考这篇文章《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署》中的配置,见下方第123行:
root@m1:/home/hadoop/kafka_2.9.2-0.8.1.1#cat config/server.properties

#Licensed to the Apache Software Foundation (ASF) under one or more

#contributor license agreements.  See the NOTICE file distributed with

#this work for additional information regarding copyright ownership.

#The ASF licenses this file to You under the Apache License, Version 2.0

#(the "License"); you may not use this file except in compliance with

#the License.  You may obtain a copy of the License at

#

#  http://www.apache.org/licenses/LICENSE-2.0[/code] 
#

#Unless required by applicable law or agreed to in writing, software

#distributed under the License is distributed on an "AS IS" BASIS,

#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

#See the License for the specific language governing permissions and

#limitations under the License.

#see kafka.server.KafkaConfig for additional details and defaults


#############################Server Basics #############################


#The id of the broker. This must be set to a unique integer for each broker.

#整数,建议根据ip区分,这里我是使用zookeeper中的id来设置

broker.id=1


#############################Socket Server Settings #############################


#The port the socket server listens on

#broker用于接收producer消息的端口

port=9092

#port=44444


#Hostname the broker will bind to. If not set, the server will bind to all interfaces

#broker的hostname

host.name=m1


#Hostname the broker will advertise to producers and consumers. If not set, it uses the

#value for "host.name" if configured.  Otherwise, it will use the value returned from

#java.net.InetAddress.getCanonicalHostName().

#这个是配置PRODUCER/CONSUMER连上来的时候使用的地址

advertised.host.name=m1


#The port to publish to ZooKeeper for clients to use. If this is not set,

#it will publish the same port that the broker binds to.

#advertised.port=<port accessible by clients>


#The number of threads handling network requests

num.network.threads=2


#The number of threads doing disk I/O

num.io.threads=8


#The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=1048576


#The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=1048576


#The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600



#############################Log Basics #############################


#A comma seperated list of directories under which to store log files

#kafka存放消息文件的路径

log.dirs=/home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs


#The default number of log partitions per topic. More partitions allow greater

#parallelism for consumption, but this will also result in more files across

#the brokers.

#topic的默认分区数

num.partitions=2


#############################Log Flush Policy #############################


#Messages are immediately written to the filesystem but by default we only fsync() to sync

#the OS cache lazily. The following configurations control the flush of data to disk. 

#There are a few important trade-offs here:

#  1. Durability: Unflushed data may be lost if you are not using replication.

#  2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

#  3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 

#The settings below allow one to configure the flush policy to flush data after a period of time or

#every N messages (or both). This can be done globally and overridden on a per-topic basis.


#The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000


#The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000


#############################Log Retention Policy #############################


#The following configurations control the disposal of log segments. The policy can

#be set to delete segments after a period of time, or after a given size has accumulated.

#A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

#from the end of the log.


#The minimum age of a log file to be eligible for deletion

#kafka接收日志的存储目录(目前我们保存7天数据log.retention.hours=168)

log.retention.hours=168


#A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

#segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824


#The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=536870912


#The interval at which log segments are checked to see if they can be deleted according 

#to the retention policies

log.retention.check.interval.ms=60000


#By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.

#If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false


#############################Zookeeper #############################


#Zookeeper connection string (see zookeeper docs for details).

#This is a comma separated host:port pairs, each corresponding to a zk

#server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

#You can also append an optional chroot string to the urls to specify the

#root directory for all kafka znodes.

zookeeper.connect=m1:2181,m2:2181,s1:2181,s2:2181


#Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

[/code]


4.3、启动zookeeper和kafka

    1)zookeeper的启动,请参考这篇文章《ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署

    启动后可以用以下命令在每台机器上查看状态
root@m1:/home/hadoop#/home/hadoop/zookeeper-3.4.5/bin/zkServer.sh status

JMX enabled by default

Using config: /home/hadoop/zookeeper-3.4.5/bin/../conf/zoo.cfg

Mode: leader

[/code]

    2)在m1,m2,m3,m4的机器上启动kafka,在这之前请先将m1上的kafka复制到另外三台机器上,复制后,记得更改server.properties配置文件中的host名称为当前所在机器。以下代码是在m1上执行后的效果:
root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &

[1] 31823

root@m1:/home/hadoop#[2014-08-05 10:03:11,210] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,261] INFO Property advertised.host.name is overridden to m1 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,261] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,264] INFO Property host.name is overridden to m1 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,264] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,264] INFO Property log.dirs is overridden to /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,265] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,265] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,265] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,265] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,266] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,266] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,267] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,267] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,268] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,268] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,268] INFO Property zookeeper.connect is overridden to m1:2181,m2:2181,s1:2181,s2:2181 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,269] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)

[2014-08-05 10:03:11,302] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)

[2014-08-05 10:03:11,303] INFO [Kafka Server 1], Connecting to zookeeper on m1:2181,m2:2181,s1:2181,s2:2181 (kafka.server.KafkaServer)

[2014-08-05 10:03:11,335] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)

[2014-08-05 10:03:11,348] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,348] INFO Client environment:host.name=m1 (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,349] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,349] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,349] INFO Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,349] INFO Client environment:java.class.path=.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,350] INFO Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,350] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,350] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,350] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,350] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,351] INFO Client environment:os.version=3.11.0-15-generic (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,351] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,351] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,351] INFO Client environment:user.dir=/home/hadoop (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,352] INFO Initiating client connection, connectString=m1:2181,m2:2181,s1:2181,s2:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@51f782b8 (org.apache.zookeeper.ZooKeeper)

[2014-08-05 10:03:11,380] INFO Opening socket connection to server m2/192.168.1.51:2181 (org.apache.zookeeper.ClientCnxn)

[2014-08-05 10:03:11,386] INFO Socket connection established to m2/192.168.1.51:2181, initiating session (org.apache.zookeeper.ClientCnxn)

[2014-08-05 10:03:11,398] INFO Session establishment complete on server m2/192.168.1.51:2181, sessionid = 0x247a3e09b460000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

[2014-08-05 10:03:11,400] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2014-08-05 10:03:11,652] INFO Loading log 'test-1' (kafka.log.LogManager)

[2014-08-05 10:03:11,681] INFO Recovering unflushed segment 0 in log test-1. (kafka.log.Log)

[2014-08-05 10:03:11,711] INFO Completed load of log test-1 with log end offset 137 (kafka.log.Log)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2014-08-05 10:03:11,747] INFO Loading log 'idoall.org-0' (kafka.log.LogManager)

[2014-08-05 10:03:11,748] INFO Recovering unflushed segment 0 in log idoall.org-0. (kafka.log.Log)

[2014-08-05 10:03:11,754] INFO Completed load of log idoall.org-0 with log end offset 5 (kafka.log.Log)

[2014-08-05 10:03:11,760] INFO Loading log 'test-0' (kafka.log.LogManager)

[2014-08-05 10:03:11,765] INFO Recovering unflushed segment 0 in log test-0. (kafka.log.Log)

[2014-08-05 10:03:11,777] INFO Completed load of log test-0 with log end offset 151 (kafka.log.Log)

[2014-08-05 10:03:11,779] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)

[2014-08-05 10:03:11,782] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)

[2014-08-05 10:03:11,800] INFO Awaiting socket connections on m1:9092. (kafka.network.Acceptor)

[2014-08-05 10:03:11,802] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)

[2014-08-05 10:03:11,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)

[2014-08-05 10:03:11,919] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2014-08-05 10:03:12,359] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2014-08-05 10:03:12,387] INFO Registered broker 1 at path /brokers/ids/1 with address m1:9092. (kafka.utils.ZkUtils$)

[2014-08-05 10:03:12,392] INFO [Kafka Server 1], started (kafka.server.KafkaServer)

[2014-08-05 10:03:12,671] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:03:12,741] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:03:25,327] INFO Partition [test,0] on broker 1: Expanding ISR for partition [test,0] from 1 to 1,2 (kafka.cluster.Partition)

[2014-08-05 10:03:25,334] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1 to 1,2 (kafka.cluster.Partition)

[2014-08-05 10:03:26,905] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1,2 to 1,2,3 (kafka.cluster.Partition)

[/code]


4.4、测试kafka的状态

    1)在m1上创建一个idoall_testTopic主题,KAFKA有几个,replication-factor就填几个
root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --create --topic idoall_testTopic --replication-factor 4 --partitions 2 --zookeeper m1:2181

Created topic "idoall_testTopic".

[2014-08-05 10:08:29,315] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,0] (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:08:29,334] INFO Completed load of log idoall_testTopic-0 with log end offset 0 (kafka.log.Log)

[2014-08-05 10:08:29,373] INFO Created log for partition [idoall_testTopic,0] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)

[2014-08-05 10:08:29,384] WARN Partition [idoall_testTopic,0] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,0] (kafka.cluster.Partition)

[2014-08-05 10:08:29,415] INFO Completed load of log idoall_testTopic-1 with log end offset 0 (kafka.log.Log)

[2014-08-05 10:08:29,416] INFO Created log for partition [idoall_testTopic,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)

[2014-08-05 10:08:29,422] WARN Partition [idoall_testTopic,1] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,1] (kafka.cluster.Partition)

[2014-08-05 10:08:29,430] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,1] (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:08:29,438] INFO Truncating log idoall_testTopic-1 to offset 0. (kafka.log.Log)

[2014-08-05 10:08:29,473] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall_testTopic,1], initOffset 0 to broker id:2,host:m2,port:9092] ) (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:08:29,475] INFO [ReplicaFetcherThread-0-2], Starting  (kafka.server.ReplicaFetcherThread)

[/code]

    2)在m1上查看刚才创建的idoall_testTopic主题
root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --list --zookeeper m1:2181   

idoall_testTopic

[/code]

    3)在m2上发送消息至kafka(m2模拟producer),发送消息“hello idoall.org”
root@m2:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-list m1:9092 --sync --topic idoall_testTopic

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

hello idoall.org

[/code]

    

    4)在s1上开启一个消费者(s1模拟consumer),可以看到刚才发送的消息
root@s1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

hello idoall.org

[/code]

    

    5)删除掉一个Topic,这里我们测试创建一个idoall的主题,再删除掉
root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --create --topic idoall --replication-factor 4 --partitions 2 --zookeeper m1:2181          

Created topic "idoall".

[2014-08-05 10:38:30,862] INFO Completed load of log idoall-1 with log end offset 0 (kafka.log.Log)

[2014-08-05 10:38:30,864] INFO Created log for partition [idoall,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)

[2014-08-05 10:38:30,870] WARN Partition [idoall,1] on broker 1: No checkpointed highwatermark is found for partition [idoall,1] (kafka.cluster.Partition)

[2014-08-05 10:38:30,878] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall,1] (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:38:30,880] INFO Truncating log idoall-1 to offset 0. (kafka.log.Log)

[2014-08-05 10:38:30,885] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall,1], initOffset 0 to broker id:3,host:s1,port:9092] ) (kafka.server.ReplicaFetcherManager)

[2014-08-05 10:38:30,887] INFO [ReplicaFetcherThread-0-3], Starting  (kafka.server.ReplicaFetcherThread)

root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --list --zookeeper m1:2181

idoall

idoall_testTopic

root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic idoall --zookeeper m2:2181    

deletion succeeded!

root@m1:/home/hadoop#/home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --list --zookeeper m1:2181                  idoall_testTopic

root@m1:/home/hadoop#

[/code]

    同样也可以进入到zookeeper中查看主题是否已经删除掉。
root@m1:/home/hadoop#/home/hadoop/zookeeper-3.4.5/bin/zkCli.sh

Connecting to localhost:2181

2014-08-05 10:15:21,863 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT

2014-08-05 10:15:21,871 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=m1

2014-08-05 10:15:21,871 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.7.0_65

2014-08-05 10:15:21,872 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation

2014-08-05 10:15:21,872 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre

2014-08-05 10:15:21,873 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/home/hadoop/zookeeper-3.4.5/bin/../build/classes:/home/hadoop/zookeeper-3.4.5/bin/../build/lib/*.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/slf4j-api-1.6.1.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/netty-3.2.2.Final.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/log4j-1.2.15.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/jline-0.9.94.jar:/home/hadoop/zookeeper-3.4.5/bin/../zookeeper-3.4.5.jar:/home/hadoop/zookeeper-3.4.5/bin/../src/java/lib/*.jar:/home/hadoop/zookeeper-3.4.5/bin/../conf:.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar

2014-08-05 10:15:21,874 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib

2014-08-05 10:15:21,874 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp

2014-08-05 10:15:21,874 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>

2014-08-05 10:15:21,875 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux

2014-08-05 10:15:21,875 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64

2014-08-05 10:15:21,876 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.11.0-15-generic

2014-08-05 10:15:21,876 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root

2014-08-05 10:15:21,877 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root

2014-08-05 10:15:21,878 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/home/hadoop

2014-08-05 10:15:21,879 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@666c211a

Welcome to ZooKeeper!

2014-08-05 10:15:21,920 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@966] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)

2014-08-05 10:15:21,934 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@849] - Socket connection established to localhost/127.0.0.1:2181, initiating session

JLine support is enabled

2014-08-05 10:15:21,966 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1207] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x147a3e1246b0007, negotiated timeout = 30000


WATCHER::


WatchedEvent state:SyncConnected type:None path:null

[zk: localhost:2181(CONNECTED) 0] ls /

[hbase, hadoop-ha, admin, zookeeper, consumers, config, controller, storm, brokers, controller_epoch]

[zk: localhost:2181(CONNECTED) 1] ls /brokers

[topics, ids]

[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics

[idoall_testTopic]

[/code]


4.5、使用Eclipse来调用kafka的JAVA API来测试kafka的集群状态

    1)消息生产端:Producertest.java    
package idoall.testkafka;


import java.util.Date;

import java.util.Properties;

import java.text.SimpleDateFormat;


import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;



/**

* 消息生产端

* @author 迦壹

* @Time 2014-08-05

*/

public class Producertest {


public static void main(String[] args) {

Properties props = new Properties();

props.put("zk.connect", "m1:2181,m2:2181,s1:2181,s2:2181");

// serializer.class为消息的序列化类

props.put("serializer.class", "kafka.serializer.StringEncoder");

// 配置metadata.broker.list, 为了高可用, 最好配两个broker实例

props.put("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092");

// 设置Partition类, 对队列进行合理的划分

//props.put("partitioner.class", "idoall.testkafka.Partitionertest");

// ACK机制, 消息发送需要kafka服务端确认

props.put("request.required.acks", "1");


props.put("num.partitions", "4");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

for (int i = 0; i < 10; i++)

{

// KeyedMessage<K, V>

//   K对应Partition Key的类型

//   V对应消息本身的类型

//   topic: "test", key: "key", message: "message"

SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");

Date curDate = new Date(System.currentTimeMillis());//获取当前时间

String str = formatter.format(curDate);


String msg = "idoall.org" + i+"="+str;

String key = i+"";

producer.send(new KeyedMessage<String, String>("idoall_testTopic",key, msg));

}

}

}

[/code]

    2)消息消费端:Consumertest.java
package idoall.testkafka;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;



/**

* 消息消费端

* @author 迦壹

* @Time 2014-08-05

*/

public class Consumertest extends Thread{


private final ConsumerConnector consumer;

private final String topic;


public static void main(String[] args) {

Consumertest consumerThread = new Consumertest("idoall_testTopic");

consumerThread.start();

}

public Consumertest(String topic) {

consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

this.topic =topic;

}


private static ConsumerConfig createConsumerConfig() {

Properties props = new Properties();

// 设置zookeeper的链接地址

props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181");

// 设置group id

props.put("group.id", "1");

// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新

props.put("auto.commit.interval.ms", "1000");

props.put("zookeeper.session.timeout.ms","10000");

return new ConsumerConfig(props);

}


public void run(){

//设置Topic=>Thread Num映射关系, 构建具体的流

Map<String,Integer> topickMap = new HashMap<String, Integer>();

topickMap.put(topic, 1);

Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);

KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);

ConsumerIterator<byte[],byte[]> it =stream.iterator();

System.out.println("*********Results********");

while(it.hasNext()){

System.err.println("get data:" +new String(it.next().message()));

try {

Thread.sleep(1000);

 } catch (InterruptedException e) {  

e.printStackTrace();

 }

}

}

}

[/code]

    3)在Eclipse查看java代码效果,在这之前先在其中一台机器(我使用的s1),开启消费者,同时观察eclipse和s1上的消费者是否都收到了消息。最后结果如下图:

    


    

    


  可以看到,刚好10条信息,没有丢失。不过消息因为均衡的原因,并非是有序的,在Kafka只提供了分区内部的有序性,不能跨partition. 每个分区的有序性,结合按Key分partition的能力对大多应用都够用了。(如何按key进行分partition,在文章末尾提供的Eclpise代码中有个Partitionertest.java提供了一个Demo


4.6、在命令行下打包java文件,测试kafka

    1)修改工程目录中的pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">[/code] 
<modelVersion>4.0.0</modelVersion>

<groupId>idoall.testkafka</groupId>

<artifactId>idoall.testkafka</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<name>idoall.testkafka</name>

<url>http://maven.apache.org</url>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

</properties>

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>3.8.1</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.14</version>

</dependency>

<dependency>

<groupId>com.sksamuel.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.0-beta1</version>

</dependency>

</dependencies>

<build>

<finalName>idoall.testkafka</finalName>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>2.0.2</version>

<configuration>

<source>1.5</source>

<target>1.5</target>

<encoding>UTF-8</encoding>

</configuration>

</plugin>

<plugin>

<artifactId>maven-assembly-plugin</artifactId>

<version>2.4</version>

<configuration>

<descriptors>

<descriptor>src/main/src.xml</descriptor>

</descriptors>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

</configuration>

<executions>

<execution>

<id>make-assembly</id> <!-- this is used for inheritance merges -->

<phase>package</phase> <!-- bind to the packaging phase -->

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

</plugins>

</build>

</project>

[/code]

    

    2)修改工程目录中的src/main/src.xml文件
<?xml version="1.0" encoding="UTF-8"?>

<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd ">

<id>jar-with-dependencies</id>

<formats>

<format>jar</format>

</formats>

<includeBaseDirectory>false</includeBaseDirectory>

<dependencySets>

<dependencySet>

<unpack>false</unpack>

<scope>runtime</scope>

</dependencySet>

</dependencySets>

<fileSets>

<fileSet>

<directory>/lib</directory>

</fileSet>

</fileSets>

</assembly>

[/code]

    3)制作依赖包,在工程目录执行mvn package,得到idoall.testkafka-jar-with-dependencies.jar,下面是部分执行后的结果:
Running idoall.testkafka.idoall.testkafka.AppTest

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec


Results :


Tests run: 1, Failures: 0, Errors: 0, Skipped: 0


[INFO]

[INFO]--- maven-jar-plugin:2.4:jar (default-jar) @ idoall.testkafka ---

[INFO]Building jar: /Users/lion/Documents/_my_project/java/idoall.testkafka/target/idoall.testkafka.jar

[INFO]

[INFO]--- maven-assembly-plugin:2.4:single (make-assembly) @ idoall.testkafka ---

[INFO]Reading assembly descriptor: src/main/src.xml

[WARNING] The assembly id jar-with-dependencies is used more than once.

[INFO]Building jar: /Users/lion/Documents/_my_project/java/idoall.testkafka/target/idoall.testkafka-jar-with-dependencies.jar

[INFO]Building jar: /Users/lion/Documents/_my_project/java/idoall.testkafka/target/idoall.testkafka-jar-with-dependencies.jar

[INFO]------------------------------------------------------------------------

[INFO]BUILD SUCCESS

[INFO]------------------------------------------------------------------------

[INFO]Total time: 9.074 s

[INFO]Finished at: 2014-08-05T12:22:47+08:00

[INFO]Final Memory: 63M/836M

[INFO]------------------------------------------------------------------------

[/code]

    4)编译文件,进入到工程目录,执行命令
liondeMacBook-Pro:idoall.testkafka lion$ pwd

/Users/lion/Documents/_my_project/java/idoall.testkafka

liondeMacBook-Pro:idoall.testkafka lion$ javac -classpath target/idoall.testkafka-jar-with-dependencies.jar -d . src/main/java/idoall/testkafka/*.java

[/code]

    5)执行编译后的文件。分别打开两个窗口,一个用来消费,一个用来生产。可以看到消费窗口可以正常显示消息。
java -classpath .:target/idoall.testkafka-jar-with-dependencies.jar idoall.testkafka.Producertest

java -classpath .:target/idoall.testkafka-jar-with-dependencies.jar idoall.testkafka.Consumertest

[/code]

    


    

    



5、FAQ

  5.1、如果在创建主题时出现下面的错误 ,那就是启动的brokers的个数达不到你所指定的–replication-factor值:
Error while executing topic command replication factor: 3 larger than available brokers: 1

kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1

at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)

at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)

at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)

at kafka.admin.TopicCommand$.main(TopicCommand.scala:50)

at kafka.admin.TopicCommand.main(TopicCommand.scala)

[/code]

  5.2、如果出现下面的错误,可以先启动kafka,再启动hadoop中的zkfc(DFSZKFailoverController):
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0130000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)

#

#There is insufficient memory for the Java Runtime Environment to continue.

#Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.

#An error report file with more information is saved as:


#/home/hadoop/hs_err_pid13558.log


[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: