Kafka 学习 -- Topic的创建
2015-12-30 16:27
337 查看
1. 使用命令行方式:[lch@linux129 bin]$ ./kafka-topics.sh --create --zookeeper hadoop221:2181,hadoop222:2181 --replication-factor 2 --partitions2 --topic TEST_TTT参数说明:--replication-factor : 表明 副本的个数, 这里写成2,表明会有两个副本--partitions : 表明会有两个partition, 表现在log日志中,就是是有两个文件夹即这TEST_TTT-0与TEST_TTT-1 注: 在leader节点上面才是两个文件,如果是从节点上面,可能只有一个--topic : topic的名称 2. 使用Java代码的方式:
对于scala不是太熟,不知道怎么调用scala中的Object,因此依照scala中的ZKStringSerializer,实现了一个
3. Topic创建的过程不管是命令行还是代码创建,最终都是调用createTopic:: createTopic(),查看其代码
再查看AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK的代码:
从上面可以看到,它会在zookeeper上面创建两个path,第一次:
从网上下载下来,也没有弄清楚它的含义,直接将key设置为123,即key总是一样的,而kafka在计算的时候,也就总是算出同一个值。 3) topic无法删掉[root@hadoop221 bin]# ./kafka-topics.sh --delete --zookeeper hadoop222:2181 --topic TESTTopic TEST is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.[root@hadoop221 bin]# ./kafka-topics.sh --list --zookeeper hadoop222:2181TEST - marked for deletionTEST_TTT - marked for deletion[root@hadoop221 bin]# 需要增加配置参数:delete.topic.enable=true
1234567891011121314151617 | public class KafkaAdmin{ public static void main(String[] args) { String[] arrys = new String[ 6 ]; arrys[ 0 ] = "--replication-factor" ; arrys[ 1 ] = "1" ; arrys[ 2 ] = "--partitions" ; arrys[ 3 ] = "1" ; arrys[ 4 ] = "--topic" ; arrys[ 5 ] = "EFGH" ; ZkClient client = new ZkClient( "192.168.8.222:2181" , 30000 , 30000 ); client.setZkSerializer( new ZKStringSerialize()); //一定要加上ZkSerializer TopicCommandOptions opts = new TopicCommandOptions(arrys); TopicCommand.createTopic(client, opts); } } |
123456789101112131415161718192021222324252627 | public class ZKStringSerialize implements ZkSerializer { @Override public byte [] serialize(Object data) throws ZkMarshallingError { if (data instanceof String){ try { return ((String)data).getBytes( "UTF-8" ); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null ; } @Override public Object deserialize( byte [] bytes) throws ZkMarshallingError { if (bytes == null ) return null ; else try { return new String(bytes, "UTF-8" ); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null ; } } |
12345678 | def createTopic(...) { //从zookeeper的/brokers/ids目录下面获取broker的列表信息 val brokerList = ZkUtils.getSortedBrokerList(zkClient) //此处会根据副本个数以及partition的个数,来分配broker节点, 即partition 0在哪几个节点上 val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) //分配完成后,就开始创建节点信息 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig) } |
12345678 | def createOrUpdateTopicPartitionAssignmentPathInZK(...) { .... .... // write out the config if there is any, this isn't transactional with the partition assignments writeTopicConfig(zkClient, topic, config) // create the partition assignment writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update) } |
writeTopicConfig(zkClient, topic, config)
它创建的目录在zookeeper中的:在config目录创建完成后,会在brokers中创建,即代码:
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
创建完成后,就会在zookeeper中看到下面的这些东西。当创建代码完成这一步之后,就会返回成功,但是对于kafka来说还有一步没有完成,就是它还需要在log目录下面,针对每个topic创建相应的目录,以后的数据就会保存在这个目录下面。创建目录的过程是通过zookeeper的文件监听来完成的。从controller.log中可以看得很清楚:[2015-12-29 18:27:26,801] DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children MNPQ (kafka.controller.PartitionStateMachine$TopicChangeListener)[2015-12-29 18:27:26,814] INFO [TopicChangeListener on Controller 0]: New topics: [Set(MNPQ)], deleted topics: [Set()], new partition replica assignment [Map([MNPQ,0] -> List(3))] (kafka.controller.PartitionStateMachine$TopicChangeListener)4. 问题记录1) 使用java代码创建topic的时候,程序返回成功了,但是log目录中没有创建相应的文件夹。 查看zookeeper中的信息:原因:
client.setZkSerializer(
new
ZKStringSerialize()); 没有设置字符串序列化函数,导致topic创建目录失败,而kafka的listen线程发现有文件夹变化后,在创建时,无法获取相应的broker信息,因此创建失败了
2) producer产生的数据,只放在partiton 0中,而partition 1中总是没有数据。
原因: producer产生的数据会根据key来选择partition,而我的代码:
1 | KeyedMessage km = new KeyedMessage<String, String>( this .topic, "123" ,data); |
相关文章推荐
- nginx proxy_next_upstream导致的一个重复提交错误
- solr发布到tomcat下
- Linux命令之stty - 显示和修改终端行设置
- 在Nginx服务器中配置针对TCP的负载均衡的方法
- linux命令行编辑快捷键
- linux中grep命令
- 将Opencv中的图像通过动态链接库传递给Labview
- Linux下PHP扩展编译的通用方法
- CentOS下的SVN服务器搭建过程以及分析
- OpenGl中的Nurbs B样条曲面绘制
- centos6.7-keepalived-DNS-ntp一主一从服务器端配置
- [原创]测试架构师是做什么的?
- Openssl自签证书
- Embedded Linux 启动时间优化
- Opencv CamShift+Kalman目标跟踪
- yum方式安装MySQL5.7-centos6.4 64位
- Docker Machine快速安装Docker环境(二)
- Linux下git生成 ssh key 的方法
- CentOS6.5安装Mysql5.5
- 设置某个shell文件,以bash方式执行