您的位置:首页 > 大数据

Kafka:下一代分布式消息系统

2017-06-12 15:49 363 查看


Apache Kafka:下一代分布式消息系统


简介

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比,有以下不同:

它被设计为一个分布式系统,易于向外扩展;

它同时为发布和订阅提供高吞吐量;

它支持多订阅者,当失败时能自动平衡消费者;

它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

本文我将重点介绍Apache Kafka的架构、特性和特点,帮助我们理解Kafka为何比传统消息服务更好。

架构我将比较Kafak和传统消息服务RabbitMQ、Apache ActiveMQ的特点,讨论一些Kafka优于传统消息服务的场景。在最后一节,我们将探讨一个进行中的示例应用,展示Kafka作为消息服务器的用途。这个示例应用的完整源代码在GitHub。关于它的详细讨论在本文的最后一节。

首先,我介绍一下Kafka的基本概念。它的架构包括以下组件:

话题(Topic)是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。

生产者(Producer)是能够发布消息到话题的任何对象。

已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。

消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。



图1:Kafka生产者、消费者和代理环境

生产者可以选择自己喜欢的序列化方法对消息内容编码。为了提高效率,生产者可以在一个发布请求中发送一组消息。下面的代码演示了如何创建生产者并发送消息。

生产者示例代码:
producer = new Producer(…);
message = new Message(“test message str”.getBytes());
set = new MessageSet(message);
producer.send(“topic1”, set);


为了订阅话题,消费者首先为话题创建一个或多个消息流。发布到该话题的消息将被均衡地分发到这些流。每个消息流为不断产生的消息提供了迭代接口。然后消费者迭代流中的每一条消息,处理消息的有效负载。与传统迭代器不同,消息流迭代器永不停止。如果当前没有消息,迭代器将阻塞,直到有新的消息发布到该话题。Kafka同时支持点到点分发模型(Point-to-point delivery model),即多个消费者共同消费队列中某个消息的单个副本,以及发布-订阅模型(Publish-subscribe model),即多个消费者接收自己的消息副本。下面的代码演示了消费者如何使用消息。

消费者示例代码:
streams[] = Consumer.createMessageStreams(“topic1”, 1)
for (message : streams[0]) {
bytes = message.payload();
// do something with the bytes
}


Kafka的整体架构如图2所示。因为Kafka内在就是分布式的,一个Kafka集群通常包括多个代理。为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。



图2:Kafka架构


Kafka存储

Kafka的存储布局非常简单。话题的每个分区对应一个逻辑日志。物理上,一个日志为相同大小的一组分段文件。每次生产者发布消息到一个分区,代理就将消息追加到最后一个段文件中。当发布的消息数量达到设定值或者经过一定的时间后,段文件真正写入磁盘中。写入完成后,消息公开给消费者。

与传统的消息系统不同,Kafka系统中存储的消息没有明确的消息Id。

消息通过日志中的逻辑偏移量来公开。这样就避免了维护配套密集寻址,用于映射消息ID到实际消息地址的随机存取索引结构的开销。消息ID是增量的,但不连续。要计算下一消息的ID,可以在其逻辑偏移的基础上加上当前消息的长度。

消费者始终从特定分区顺序地获取消息,如果消费者知道特定消息的偏移量,也就说明消费者已经消费了之前的所有消息。消费者向代理发出异步拉请求,准备字节缓冲区用于消费。每个异步拉请求都包含要消费的消息偏移量。Kafka利用sendfile
API高效地从代理的日志段文件中分发字节给消费者。



图3:Kafka存储架构


Kafka代理

与其它消息系统不同,Kafka代理是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,代理完全不管。这种设计非常微妙,它本身包含了创新。

从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。

这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。


ZooKeeper与Kafka

考虑一下有多个服务器的分布式系统,每台服务器都负责保存数据,在数据上执行操作。这样的潜在例子包括分布式搜索引擎、分布式构建系统或者已知的系统如Apache Hadoop。所有这些分布式系统的一个常见问题是,你如何在任一时间点确定哪些服务器活着并且在工作中。最重要的是,当面对这些分布式计算的难题,例如网络失败、带宽限制、可变延迟连接、安全问题以及任何网络环境,甚至跨多个数据中心时可能发生的错误时,你如何可靠地做这些事。这些正是Apache
ZooKeeper所关注的问题,它是一个快速、高可用、容错、分布式的协调服务。你可以使用ZooKeeper构建可靠的、分布式的数据结构,用于群组成员、领导人选举、协同工作流和配置服务,以及广义的分布式数据结构如锁、队列、屏障(Barrier)和锁存器(Latch)。许多知名且成功的项目依赖于ZooKeeper,其中包括HBase、Hadoop 2.0、Solr Cloud、Neo4J、Apache
Blur(Incubating)和Accumulo。

ZooKeeper是一个分布式的、分层级的文件系统,能促进客户端间的松耦合,并提供最终一致的,类似于传统文件系统中文件和目录的Znode视图。它提供了基本的操作,例如创建、删除和检查Znode是否存在。它提供了事件驱动模型,客户端能观察特定Znode的变化,例如现有Znode增加了一个新的子节点。ZooKeeper运行多个ZooKeeper服务器,称为Ensemble,以获得高可用性。每个服务器都持有分布式文件系统的内存复本,为客户端的读取请求提供服务。



图4:ZooKeeper Ensemble架构

上图4展示了典型的ZooKeeper ensemble,一台服务器作为Leader,其它作为Follower。当Ensemble启动时,先选出Leader,然后所有Follower复制Leader的状态。所有写请求都通过Leader路由,变更会广播给所有Follower。变更广播被称为原子广播。

Kafka中ZooKeeper的用途:正如ZooKeeper用于分布式系统的协调和促进,Kafka使用ZooKeeper也是基于相同的原因。ZooKeeper用于管理、协调Kafka代理。每个Kafka代理都通过ZooKeeper协调其它Kafka代理。当Kafka系统中新增了代理或者某个代理故障失效时,ZooKeeper服务将通知生产者和消费者。生产者和消费者据此开始与其它代理协调工作。Kafka整体系统架构如图5所示。



图5:Kafka分布式系统的总体架构


Apache Kafka对比其它消息服务

让我们了解一下使用Apache Kafka的两个项目,以对比其它消息服务。这两个项目分别是LinkedIn和我的项目:


LinkedIn的研究

LinkedIn团队做了个实验研究,对比Kafka与Apache
ActiveMQ V5.4和RabbitMQ V2.4的性能。他们使用ActiveMQ默认的消息持久化库Kahadb。LinkedIn在两台Linux机器上运行他们的实验,每台机器的配置为8核2GHz、16GB内存,6个磁盘使用RAID10。两台机器通过1GB网络连接。一台机器作为代理,另一台作为生产者或者消费者。


生产者测试

LinkedIn团队在所有系统中配置代理,异步将消息刷入其持久化库。对每个系统,运行一个生产者,总共发布1000万条消息,每条消息200字节。Kafka生产者以1和50批量方式发送消息。ActiveMQ和RabbitMQ似乎没有简单的办法来批量发送消息,LinkedIn假定它的批量值为1。结果如下面的图6所示:



图6:LinkedIn的生产者性能实验结果

Kafka性能要好很多的主要原因包括:

Kafka不等待代理的确认,以代理能处理的最快速度发送消息。

Kafka有更高效的存储格式。平均而言,Kafka每条消息有9字节的开销,而ActiveMQ有144字节。其原因是JMS所需的沉重消息头,以及维护各种索引结构的开销。LinkedIn注意到ActiveMQ一个最忙的线程大部分时间都在存取B-Tree以维护消息元数据和状态。


消费者测试

为了做消费者测试,LinkedIn使用一个消费者获取总共1000万条消息。LinkedIn让所有系统每次拉请求都预获取大约相同数量的数据,最多1000条消息或者200KB。对ActiveMQ和RabbitMQ,LinkedIn设置消费者确认模型为自动。结果如图7所示。



图7:LinkedIn的消费者性能实验结果

Kafka性能要好很多的主要原因包括:

Kafka有更高效的存储格式;在Kafka中,从代理传输到消费者的字节更少。

ActiveMQ和RabbitMQ两个容器中的代理必须维护每个消息的传输状态。LinkedIn团队注意到其中一个ActiveMQ线程在测试过程中,一直在将KahaDB页写入磁盘。与此相反,Kafka代理没有磁盘写入动作。最后,Kafka通过使用sendfile API降低了传输开销。

目前,我正在工作的一个项目提供实时服务,从消息中快速并准确地提取场外交易市场(OTC)定价内容。这是一个非常重要的项目,处理近25种资产类别的财务信息,包括债券、贷款和ABS(资产担保证券)。项目的原始信息来源涵盖了欧洲、北美、加拿大和拉丁美洲的主要金融市场领域。下面是这个项目的一些统计,说明了解决方案中包括高效的分布式消息服务是多么重要:

每天处理的消息数量超过1,300,000;

每天解析的OTC价格数量超过12,000,000;

支持超过25种资产类别;

每天解析的独立票据超过70,000。

消息包含PDF、Word文档、Excel及其它格式。OTC定价也可能要从附件中提取。

由于传统消息服务器的性能限制,当处理大附件时,消息队列变得非常大,我们的项目面临严重的问题,JMSqueue一天需要启动2-3次。重启JMS队列可能丢失队列中的全部消息。项目需要一个框架,不论解析器(消费者)的行为如何,都能够保住消息。Kafka的特性非常适用于我们项目的需求。

当前项目具备的特性:

使用Fetchmail获取远程邮件消息,然后由Procmail过滤并处理,例如单独分发基于附件的消息。

每条消息从单独的文件获取,该文件被处理(读取和删除)为一条消息插入到消息服务器中。

消息内容从消息服务队列中获取,用于解析和提取信息。


示例应用

这个示例应用是基于我在项目中使用的原始应用修改后的版本。我已经删除日志的使用和多线程特性,使示例应用的工件尽量简单。示例应用的目的是展示如何使用Kafka生产者和消费者的API。应用包括一个生产者示例(简单的生产者代码,演示Kafka生产者API用法并发布特定话题的消息),消费者示例(简单的消费者代码,用于演示Kafka消费者API的用法)以及消息内容生成API(在特定路径下生成消息内容到文件的API)。下图展示了各组件以及它们与系统中其它组件间的关系。



图8:示例应用组件架构

示例应用的结构与Kafka源代码中的例子程序相似。应用的源代码包含Java源程序文件夹‘src’和'config'文件夹,后者包括几个配置文件和一些Shell脚本,用于执行示例应用。要运行示例应用,请参照ReadMe.md文件或GitHub网站Wiki页面的说明。

程序构建可以使用Apache Maven,定制也很容易。如果有人想修改或定制示例应用的代码,有几个Kafka构建脚本已经过修改,可用于重新构建示例应用代码。关于如何定制示例应用的详细描述已经放在项目GitHub的Wiki页面

现在,让我们看看示例应用的核心工件。

Kafka生产者代码示例
/**
* Instantiates a new Kafka producer.
*
* @param topic the topic
* @param directoryPath the directory path
*/
public KafkaMailProducer(String topic, String directoryPath) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
this.directoryPath = directoryPath;
}

public void run() {
Path dir = Paths.get(directoryPath);
try {
new WatchDir(dir).start();
new ReadDir(dir).start();
} catch (IOException e) {
e.printStackTrace();
}
}


上面的代码片断展示了Kafka生产者API的基本用法,例如设置生产者的属性,包括发布哪个话题的消息,可以使用哪个序列化类以及代理的相关信息。这个类的基本功能是从邮件目录读取邮件消息文件,然后作为消息发布到Kafka代理。目录通过java.nio.WatchService类监视,一旦新的邮件消息Dump到该目录,就会被立即读取并作为消息发布到Kafka代理。

Kafka消费者代码示例
public KafkaMailConsumer(String topic) {
consumer =
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}

/**
* Creates the consumer config.
*
* @return the consumer config
*/
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaMailProperties.zkConnect);
props.put("group.id", KafkaMailProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}

public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}


上面的代码演示了基本的消费者API。正如我们前面提到的,消费者需要设置消费的消息流。在Run方法中,我们进行了设置,并在控制台打印收到的消息。在我的项目中,我们将其输入到解析系统以提取OTC定价。

在当前的质量保证系统中,我们使用Kafka作为消息服务器用于概念验证(Proof of Concept,POC)项目,它的整体性能优于JMS消息服务。其中一个我们感到非常兴奋的特性是消息的再消费(re-consumption),这让我们的解析系统可以按照业务需求重新解析某些消息。基于Kafka这些很好的效果,我们正计划使用它,而不是用Nagios系统,去做日志聚合与分析。


总结

Kafka是一种处理大量数据的新型系统。Kafka基于拉的消费模型让消费者以自己的速度处理消息。如果处理消息时出现了异常,消费者始终可以选择再消费该消息。


kafka:一个分布式消息系统

1.背景
最近因为工作需要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是因为明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,所以希望找一个适合分布式的消息系统。
以下是内容是调研过程中总结的一些知识和经验,欢迎拍砖。
2.基础知识
2.1.什么是消息队列
首先,我们来看看什么是消息队列,维基百科里的解释翻译过来如下:
 
队列提供了一种异步通信协议,这意味着消息的发送者和接收者不需要同时与消息保持联系,发送者发送的消息会存储在队列中,直到接收者拿到它。
 
一般我们把消息的发送者称为生产者,消息的接收者称为消费者;注意定义中的那两个字“异步”,通常生产者的生产速度和消费者的消费速度是不相等的;如果两个程序始终保持同步沟通,那势必会有一方存在空等时间;如果两个程序一持续运行的话,消费者的平均速度一定要大于生产者,不然队列囤积会越来越多;当然,如果消费者没有时效性需求的话,也可以把消息囤积在队列中,集中消费。
说到这里,我们再来谈谈队列的分类,一般我们根据生产者和消费者的不同,可以把队列分为三类:
第一类是在一个应用程序内部(进程之间或者线程之间),相信大家学多线程时都写过“生产者消费者”程序,生产者负责生产,将生产的结果放到缓冲区(如共享数组),消费者从缓冲区取出消费,在这里,这个缓冲区就可以称为“消息队列”。
第二类其实也算在第一类的特例,就像我们喜欢把操作系统和应用程序区别对待来看,操作系统要处理无数繁杂的事物,各进程、线程之间的数据交换少不了消息队列的支持。
第三类是更为通用意义上的“消息队列”,这类队列主要作用于不同应用,特别是跨机器、平台,这令数据的交换更加广泛,一般一款独立的队列产品除了实现消息的传递外,还提供了相应的可靠性、事务、分布式等特性,将生产者、消费者从中解耦。常见的消费队列产品根据开源与否又可分为两类:
专有软件:IBM WebSphere MQ,MSMQ…
开源软件:ActiveMQ、RabbitMQ、Kafka…
2.2.JMS与AMQP
好了,对于上述第三类“消息队列”,要在不同的机器中提供消息队列的功能,那势必要有统一的规范,这时候SUN就跳出来了,作为跨平台的JAVA势必也要支持跨平台的消息传递,基于此,SUN提供了一套消息标准:Java Message Service,缩写JMS,但是这套规范定义的是API层面的标准,在JAVA体系中可以很方便的交换,但对于其他平台就需要,可能需要消息队列产品本身支持多协议(如OpenWire、STMOP)。
而AMQP定义的比JMS更加底层,从名字就能看出来(Advanced Message Queuing Protocol),它定义的是Wire-level的协议,天然具有跨平台、跨语言的特性,基于此实现的消息队列可以与任何支持该协议的平台交互。
一种是JAVA层面的API,一种是Wire-level协议,这是JMS和AMQP最本质的区别;同时两种标准还有两个比较明显的差异:
一是消息传递模型;JMS比较简单,支持两种最通用的Peer-2-Peer、publisher/subscriber;通俗点就是点对点和广播模式;而AMQP定义的更为复杂,其定义了一种exchange&binding机制,由此支持五种模型:direct exchange、fanout exchange、topic exchange、headers exchange、system exchange,本质上与P2P、PUB/SUB一样,但是更加细致些。
二是支持的消息类型,JMS支持多种消息模型:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message等;而AMQP只有byte数组。
2.3.ActiveMQ
ActiveMQ是基于JMS实现的Provider(可以理解为队列),它支持多种协议,如OpenWire,Stomp,AMQP等,基于此,支持多平台;支持事务,支持分发策略、还有上面的多种消息模型。这里我们不细谈ActiveMQ的各特性,我们着重来看ActiveMQ的分布式模型。
ActiveMQ支持分布式,它支持Master-Slave提供高可用,也支持Broker-Cluster提供负载均衡,但是它的负载基于一种Forwarding Bridge机制。
 
在这种机制下,任意时刻一条消只会被一个broker持有,producer发送的消息,可能会经过多个broker转发最终才会到达consumer,可以想象,当broker越来越多时,几乎每次消费都要经过转发,效率会明显下降;并且在这种复杂逻辑下,任一broker的加入和移除都显得十分复杂;这两点是我不建议使用ActiveMQ分布式集群的根本原因。



3.Kafka
好,我们最后来谈今天的主角Kafka,这个奇特的名字我始终没有找到典故,也许是开发者暗恋女孩(基友)的名字吧^_^,Kafka由linkin开发,最初的目的是为了应对linkin庞大的活动流数据(登录、浏览、点击、分享、喜欢等),这部分数据容量庞大,但是可靠性要求不高,故而通过牺牲一部分可靠性(这并不是说我们的数据会按百分比丢,我们后面再谈)来提升吞吐量;它砍掉了很多复杂的特性,如事务、分发策略、多种消息模型等;通过自身独特的设计将消息持久化到磁盘上,以此同时支持在线和离线消费;并且其天生为分布式而设计,压根就没有单机模式(或者说单机模式是分布式的特例),能够很好的扩展。实际应用中,Kafka可以用来做消息队列、流式处理(一般结合storm)、日志聚合等。
3.1.架构



我们先宏观的看看Kafka的架构,Producer集群通过zookeeper(实际中写的是broker list)获取所写topic对应的partition列表,然后顺序发送消息(支持自己实现分发策略),broker集群负责消息的存储和传递,支持Master Slaver模型,可分布式扩展;Consumer集群从zookeeper上获取topic所在的partition列表,然后消费,一个partition只能被一个consumer消费。Name
Server集群(一般是zookeeper)提供名称服务等协调信息。至于什么是topic,什么是partition,我们接下来看。
3.2.Topic
Topic是生产者生产、消费者消费的队列标识。一个Topic由一个或多个partition组成,每个partition可以单独存在一个broker上,消费者可以往任一partition发送消息,以此实现生产的分布式,任一partition都可以被且只被一个消费者消息,以此实现消费的分布式;因此partition的设计提供了分布式的基础。



同时,从上图我们也能发现这种设计还有一个优点,因为每个partition内的消息是有序的,而一个partition只能被一个消费者消费,因此Kafka能提供partition层面的消息有序,而传统的队列在多个consumer的情况下是完全无法保证有序的。
3.3.消息传递模型
传统的消息队列最少提供两种消息模型,一种P2P,一种PUB/SUB,而Kafka并没有这么做,巧妙的,它提供了一个消费者组的概念,一个消息可以被多个消费者组消费,但是只能被一个消费者组里的一个消费者消费,这样当只有一个消费者组时就等同与P2P模型,当存在多个消费者组时就是PUB/SUB模型。



3.4.消息持久化
很多系统、组件为了提升效率一般恨不得把所有数据都扔到内存里,然后定期flush到磁盘上;可实际上,现代操作系统也是这样,所有的现代操作系统都乐于将空闲内存转作磁盘缓存(页面缓存),想不用都难;对于这样的系统,他的数据在内存中保存了一份,同时也在OS的页面缓存中保存了一份,这样不但多了一个步骤还让内存的使用率下降了一半;因此,Kafka决定直接使用页面缓存;但是随机写入的效率很慢,为了维护彼此的关系顺序还需要额外的操作和存储,而线性的写入可以避免这些,实际上,线性写入(linear
write)的速度大约是300MB/秒,但随即写入却只有50k/秒,其中的差别接近10000倍。这样,Kafka以页面缓存为中间的设计在保证效率的同时还提供了消息的持久化,每个消费者自己维护当前读取数据的offser(也可委托给zookeeper),以此可同时支持在线和离线的消费。
3.5.Push vs. Pull
对于消息的消费,ActiveMQ使用PUSH模型,而Kafka使用PULL模型,两者各有利弊,对于PUSH,broker很难控制数据发送给不同消费者的速度,而PULL可以由消费者自己控制,但是PULL模型可能造成消费者在没有消息的情况下盲等,这种情况下可以通过long polling机制缓解,而对于几乎每时每刻都有消息传递的流式系统,这种影响可以忽略。
3.6.可靠性
刚刚说Kafka牺牲了一些可靠性来提升吞吐量,很多同学可能担心消息的丢失,那么我们现在来看看各种情况下的可靠性。



对于如上的模型,我们分开来看,
先来看消息投递可靠性,一个消息如何算投递成功,Kafka提供了三种模式,第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;第二种是对于Master Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型。
我们再来看消息在broker上的可靠性,因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。
接着,我们再看消息消费的可靠性,Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况,这种情况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决,但是如果你的应用不在乎重复消费,那就干脆不要解决,以换取最大的性能。
最后,我们再来看zookeeper的可靠性,很明显,他要挂了,一切都完了,地球就毁灭了,人类就灭绝了,星级穿越也挽救不了了……所以增强可靠性的方式就是把zookeeper也部署成集群。
3.7.性能
好了,说了那么多,我们实际来测试下Kafka在各种情况下的性能,为了对比我也测了下单机模式下ActiveMQ的性能,不过由于懒,没有搭建ActiveMQ集群进行测试,但是基于其恶心的Forwarding Bridge模型,我也持悲观态度。
首先,测试环境如下:
Kafka:3 broker;8核/32G;默认配置
ActiveMQ:1 broker;8核/32G;默认配置
Producer: 一台机器通过多线程模拟多producer;8核/32G;默认配置,异步发送
Consumer: 一台机器通过多线程模拟多consumer;8核/32G;默认配置
除了特殊说明,生产和消费同时进行。
 
然后,我使用如下字符表示各种测试条件:
1T-1P3C-1P1C-1KW-1K:
1T:1个toipc
1P3C:1个partition 3个replication
1P1C:1个producer 1个consumer
1KW:1千万条消息
1K:每个消息1K
 
我先对ActiveMQ在单机多Producer、多consumer的情况下的测试,结果比我想象中的好,官方的给出的一个数据是1-2K的数据,每秒10-20K个,这样算下来大概30-40MB/S,而测试的结果在多线程的情况下会更好些。
ActiveMQ-threadProduceConsume
1T-XXX-1P1C-1KW-1K28.925MB/S28.829MB/S
1T-XXX-3P3C-1KW-1K43.711MB/S41.791MB/S
1T-XXX-8P8C-1KW-1K52.426MB/S52.383MB/S
 
然后我又对Kafka进行了相应的测试,用一个partition模拟单机模式,结果和预想的一样,在单机模型下,两者差异不大;而官方给的数据说生产者能达到50MB/S,消费者能达到100MB/S,生产者符合官方数据,而消费者我始终没有压到那么高的速度。
Kafka- threadProduceConsume
1T-1P1C-1P1C-1KW-1K29.214MB/S29.117MB/S
1T-1P1C-3P3C-1KW-1K46.168MB/S43.018MB/S
1T-1P1C-8P8C-1KW-1K52.140MB/S51.975MB/S
 
接下来的对于Kafka集群,我想同样数量的消息会不会因为topic数目的增多而影响,测试结果如下,表明topic越多,速度会有所下降,也符合预期。
Kafka-topicProduceConsume
1T-3P3C-3P3C-1.2KW-1K49.255MB/S49.204MB/S
3T-3P3C-3P3C-0.4KW*3-1K46.239MB/S45.774MB/S
 
然后为了测试partition对性能的影响,进行了如下测试,可以看到partition数量越多,总的生产和消费速度越快;但是意外的是Only produce情况下生产效率没有明显提升反而略慢,这里怀疑和page cache有关,没有深入研究。
Kafka-partitionProduceConsumeOnly ProduceOnly Consume
1T-1P3C-1P1C-1KW-1K29.213MB/S29.117MB/S28.941MB/S34.360MB/S
1T-3P3C-3P3C-1KW-1K47.103MB/S46.966MB/S46.540MB/S66.219MB/S
1T-8P3C-8P8C-1KW-1K61.522MB/S61.412MB/S60.703MB/S72.701MB/S
 
综上,我们可以看到Kafka的性能和吞吐是可以扩展的。
3.8.风险点
对于我们来说,Kafka主要有两个风险点,第一,要深入使用必须要熟读源码,而kafka源码是用scala写的,我们并没有相应的技术储备,需要学习;第二,kafka技术较新,目前的版本是0.8.1.1,看起来还不太成熟。
4.KG应用
这一块是在公司内部系统的应用,不适合对外,所以这里删去。
5.参考资料
Kafka-DOC:http://kafka.apache.org/documentation.html
ActiveMQ-DOC:http://activemq.apache.org
Understading the differences between AMQP & JMS:http://www.wmrichards.com/amqp.pdf
WIKI-MQ:http://en.wikipedia.org/wiki/Message_queue
WIKI-JMS:http://en.wikipedia.org/wiki/Java_Message_Service
WIKI-AMQP:http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息