您的位置:首页 > 运维架构 > 网站架构

kafka架构原理和安装部署

2016-05-29 11:21 701 查看

一、什么是kafka

  kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

重点内容

二、kafka架构

  Kafka内在就是分布式的,一个Kafka集群通常包括多个broker。为了均衡负载,将话题分成多个分区,每个broker存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。

  架构中的主要组件解析如下:

 


话题(Topic)是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。
生产者(Producer)是能够发布消息到话题的任何对象。 已发布的消息保存在一组服务器中,它们被称为Broker或Kafka集群。
消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。

三、kafka存储结构设计

  kafka存储布局是在话题的每个分区对应一个逻辑日志。物理上,一个日志为相同大小的一组分段文件。每次生产者发布消息到一个分区,代理就将消息追加到最后一个段文件中。当发布的消息数量达到设定值或者经过一定的时间后,段文件真正写入磁盘中。写入完成后,消息公开给消费者。其结构如下图所示:

  


  消费者始终从特定分区顺序地获取消息,如果消费者知道特定消息的偏移量,也就说明消费者已经消费了之前的所有消息。消费者向代理发出异步拉请求,准备字节缓冲区用于消费。每个异步拉请求都包含要消费的消息偏移量。Kafka利用sendfile API高效地从代理的日志段文件中分发字节给消费者。

 

四、kafka 应用代码示例

Kafka生产者代码示例:

<code class="hljs cs has-numbering"><span class="hljs-keyword">public</span> <span class="hljs-title">KafkaMailProducer</span>(String topic, String directoryPath) {
props.put(<span class="hljs-string">"serializer.class"</span>, <span class="hljs-string">"kafka.serializer.StringEncoder"</span>);
props.put(<span class="hljs-string">"metadata.broker.list"</span>, <span class="hljs-string">"localhost:9092"</span>);
producer = <span class="hljs-keyword">new</span> kafka.javaapi.producer.Producer<Integer, String>(<span class="hljs-keyword">new</span> ProducerConfig(props));
<span class="hljs-keyword">this</span>.topic = topic;
<span class="hljs-keyword">this</span>.directoryPath = directoryPath;
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span>() {
Path dir = Paths.<span class="hljs-keyword">get</span>(directoryPath);
<span class="hljs-keyword">try</span> {
<span class="hljs-keyword">new</span> WatchDir(dir).start();
<span class="hljs-keyword">new</span> ReadDir(dir).start();
} <span class="hljs-keyword">catch</span> (IOException e) {
e.printStackTrace();
}
} </code><ul class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li></ul><ul class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li></ul>

  上面的代码片断展示了Kafka生产者API的基本用法,例如设置生产者的属性,包括发布哪个话题的消息,可以使用哪个序列化类以及代理的相关信息。这个类的基本功能是从邮件目录读取邮件消息文件,然后作为消息发布到Kafka代理。目录通过Java.nio.WatchService类监视,一旦新的邮件消息Dump到该目录,就会被立即读取并作为消息发布到Kafka代理。

Kafka消费者代码示例: 

<code class="hljs cs has-numbering"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> ConsumerConfig <span class="hljs-title">createConsumerConfig</span>() {
Properties props = <span class="hljs-keyword">new</span> Properties();
props.put(<span class="hljs-string">"zookeeper.connect"</span>, KafkaMailProperties.zkConnect);
props.put(<span class="hljs-string">"group.id"</span>, KafkaMailProperties.groupId);
props.put(<span class="hljs-string">"zookeeper.session.timeout.ms"</span>, <span class="hljs-string">"400"</span>);
props.put(<span class="hljs-string">"zookeeper.sync.time.ms"</span>, <span class="hljs-string">"200"</span>);
props.put(<span class="hljs-string">"auto.commit.interval.ms"</span>, <span class="hljs-string">"1000"</span>);
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> ConsumerConfig(props);
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span>() {
Map<String, Integer> topicCountMap = <span class="hljs-keyword">new</span> HashMap<String, Integer>();
topicCountMap.put(topic, <span class="hljs-keyword">new</span> Integer(<span class="hljs-number">1</span>));
Map<String, List<KafkaStream<<span class="hljs-keyword">byte</span>[], <span class="hljs-keyword">byte</span>[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<<span class="hljs-keyword">byte</span>[], <span class="hljs-keyword">byte</span>[]> stream = consumerMap.<span class="hljs-keyword">get</span>(topic).<span class="hljs-keyword">get</span>(<span class="hljs-number">0</span>);
ConsumerIterator<<span class="hljs-keyword">byte</span>[], <span class="hljs-keyword">byte</span>[]> it = stream.iterator();
<span class="hljs-keyword">while</span> (it.hasNext())
System.<span class="hljs-keyword">out</span>.println(<span class="hljs-keyword">new</span> String(it.next().message()));
}</code><ul class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li></ul><ul class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li></ul>

   上面的代码演示了基本的消费者API。正如我们前面提到的,消费者需要设置消费的消息流。在Run方法中,我们进行了设置,并在控制台打印收到的消息。

五、kafka的安装部署

(提示:在安装kafka之前,需要先安装部署zookeeper 集群。请参考:http://blog.csdn.net/u010330043/article/details/51209939

kafka的安装步骤如下:
步骤一  解压
步骤二  修改server.properties

<code class="hljs avrasm has-numbering">
broker<span class="hljs-preprocessor">.id</span>=<span class="hljs-number">1</span>
zookeeper<span class="hljs-preprocessor">.connect</span>=cs2:<span class="hljs-number">2181</span>,cs3:<span class="hljs-number">2181</span>,cs4:<span class="hljs-number">2181</span></code><ul class="pre-numbering"><li>1</li><li>2</li><li>3</li></ul><ul class="pre-numbering"><li>1</li><li>2</li><li>3</li></ul>

步骤三  将zookeeper集群启动

步骤四  在每一台节点上启动broker

<code class="hljs axapta has-numbering">bin/kafka-<span class="hljs-keyword">server</span>-start.sh config/<span class="hljs-keyword">server</span>.properties</code><ul class="pre-numbering"><li>1</li></ul><ul class="pre-numbering"><li>1</li></ul>

步骤五  在kafka集群中创建一个topic

<code class="hljs brainfuck has-numbering">
<span class="hljs-comment">bin/kafka</span><span class="hljs-literal">-</span><span class="hljs-comment">topics</span><span class="hljs-string">.</span><span class="hljs-comment">sh</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">create</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">zookeeper</span> <span class="hljs-comment">weekend05:2181</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">replication</span><span class="hljs-literal">-</span><span class="hljs-comment">factor</span> <span class="hljs-comment">3</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">partitions</span> <span class="hljs-comment">1</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">topic</span> <span class="hljs-comment">order</span></code><ul class="pre-numbering"><li>1</li><li>2</li></ul><ul class="pre-numbering"><li>1</li><li>2</li></ul>

步骤六  用一个producer向某一个topic中写入消息

<code class="hljs lasso has-numbering">
bin/kafka<span class="hljs-attribute">-console</span><span class="hljs-attribute">-producer</span><span class="hljs-built_in">.</span>sh <span class="hljs-subst">--</span>broker<span class="hljs-attribute">-list</span> weekend:<span class="hljs-number">9092</span> <span class="hljs-subst">--</span>topic <span class="hljs-keyword">order</span></code><ul class="pre-numbering"><li>1</li><li>2</li></ul><ul class="pre-numbering"><li>1</li><li>2</li></ul>

步骤七  用一个comsumer从某一个topic中读取信息

<code class="hljs lasso has-numbering">bin/kafka<span class="hljs-attribute">-console</span><span class="hljs-attribute">-consumer</span><span class="hljs-built_in">.</span>sh <span class="hljs-subst">--</span>zookeeper weekend05:<span class="hljs-number">2181</span> <span class="hljs-subst">--</span>from<span class="hljs-attribute">-beginning</span> <span class="hljs-subst">--</span>topic <span class="hljs-keyword">order</span></code><ul class="pre-numbering"><li>1</li></ul><ul class="pre-numbering"><li>1</li></ul>

步骤八  查看一个topic的分区及副本状态信息

<code class="hljs brainfuck has-numbering"><span class="hljs-comment">bin/kafka</span><span class="hljs-literal">-</span><span class="hljs-comment">topics</span><span class="hljs-string">.</span><span class="hljs-comment">sh</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">describe</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">zookeeper</span> <span class="hljs-comment">weekend05:2181</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-comment">topic</span> <span class="hljs-comment">order</span></code><ul class="pre-numbering"><li>1</li></ul><ul class="pre-numbering"><li>1</li></ul>

参考文献: Apache Kafka: Next Generation Distributed Messaging System
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: