您的位置:首页 > 运维架构

Kafka 学习 -- Topic的创建

2015-12-30 16:27 337 查看
1. 使用命令行方式:[lch@linux129 bin]$ ./kafka-topics.sh --create --zookeeper hadoop221:2181,hadoop222:2181 --replication-factor 2 --partitions2 --topic TEST_TTT参数说明:--replication-factor : 表明 副本的个数, 这里写成2,表明会有两个副本--partitions              :  表明会有两个partition, 表现在log日志中,就是是有两个文件夹即这TEST_TTT-0与TEST_TTT-1  注: 在leader节点上面才是两个文件,如果是从节点上面,可能只有一个--topic                      :   topic的名称 2. 使用Java代码的方式:
1234567891011121314151617
public
 
class
 
KafkaAdmin{
    
public
 
static
 
void
 
main(String[] args) {
        
String[] arrys = 
new
 
String[
6
];
        
arrys[
0
] = 
"--replication-factor"
;
        
arrys[
1
] = 
"1"
;
        
arrys[
2
] = 
"--partitions"
;
        
arrys[
3
] = 
"1"
;
        
arrys[
4
] = 
"--topic"
;
        
arrys[
5
] = 
"EFGH"
;
        
ZkClient client =  
new
 
ZkClient(
"192.168.8.222:2181"
30000
30000
);
        
client.setZkSerializer(
new
 
ZKStringSerialize());   
//一定要加上ZkSerializer
     
 
        
TopicCommandOptions opts = 
new
 
TopicCommandOptions(arrys);     
        
TopicCommand.createTopic(client, opts); 
    
}
}
   
 
对于scala不是太熟,不知道怎么调用scala中的Object,因此依照scala中的ZKStringSerializer,实现了一个
123456789101112131415161718192021222324252627
public
 
class
 
ZKStringSerialize 
implements
 
ZkSerializer {
    
@Override
    
public
 
byte
[] serialize(Object data) 
throws
 
ZkMarshallingError {
        
if
(data 
instanceof
 
String){
            
try
 
{
                
return
 
((String)data).getBytes(
"UTF-8"
);
            
catch
 
(UnsupportedEncodingException e) {
                
// TODO Auto-generated catch block
                
e.printStackTrace();
            
}
        
}
        
return
 
null
;
    
}
    
@Override
    
public
 
Object deserialize(
byte
[] bytes) 
throws
 
ZkMarshallingError {
        
if
 
(bytes == 
null
)
          
return
 
null
;
        
else
            
try
 
{
                
return
 
new
 
String(bytes, 
"UTF-8"
);
            
catch
 
(UnsupportedEncodingException e) {
                
e.printStackTrace();
            
}
        
return
 
null
;
    
}
}
   
3. Topic创建的过程不管是命令行还是代码创建,最终都是调用createTopic:: createTopic(),查看其代码
12345678
  
def
 
createTopic(...) {
    
//从zookeeper的/brokers/ids目录下面获取broker的列表信息
    
val
 
brokerList 
=
 
ZkUtils.getSortedBrokerList(zkClient)
    
//此处会根据副本个数以及partition的个数,来分配broker节点, 即partition 0在哪几个节点上
    
val
 
replicaAssignment 
=
 
AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
    
//分配完成后,就开始创建节点信息
    
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
  
}
再查看AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK的代码:
12345678
 
def
 
createOrUpdateTopicPartitionAssignmentPathInZK(...) {
        
.... ....
    
// write out the config if there is any, this isn't transactional with the partition assignments
    
writeTopicConfig(zkClient, topic, config)
    
 
    
// create the partition assignment
    
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
  
}
从上面可以看到,它会在zookeeper上面创建两个path,第一次:
  
writeTopicConfig(zkClient, topic, config)
它创建的目录在zookeeper中的:
在config目录创建完成后,会在brokers中创建,即代码:
 
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
创建完成后,就会在zookeeper中看到下面的这些东西。
当创建代码完成这一步之后,就会返回成功,但是对于kafka来说还有一步没有完成,就是它还需要在log目录下面,针对每个topic创建相应的目录,以后的数据就会保存在这个目录下面。创建目录的过程是通过zookeeper的文件监听来完成的。从controller.log中可以看得很清楚:[2015-12-29 18:27:26,801] DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children MNPQ (kafka.controller.PartitionStateMachine$TopicChangeListener)[2015-12-29 18:27:26,814] INFO [TopicChangeListener on Controller 0]: New topics: [Set(MNPQ)], deleted topics: [Set()], new partition replica assignment [Map([MNPQ,0] -> List(3))] (kafka.controller.PartitionStateMachine$TopicChangeListener)4. 问题记录1) 使用java代码创建topic的时候,程序返回成功了,但是log目录中没有创建相应的文件夹。 查看zookeeper中的信息:原因: 
 
client.setZkSerializer(
new
 
ZKStringSerialize());
   没有设置字符串序列化函数,导致topic创建目录失败,而kafka的listen线程发现有文件夹变化后,在创建时,无法获取相应的broker信息,因此创建失败了
2) producer产生的数据,只放在partiton 0中,而partition 1中总是没有数据。
原因: producer产生的数据会根据key来选择partition,而我的代码:
1
     
KeyedMessage km = 
new
 
KeyedMessage<String, String>(
this
.topic, 
"123"
 
,data);
从网上下载下来,也没有弄清楚它的含义,直接将key设置为123,即key总是一样的,而kafka在计算的时候,也就总是算出同一个值。 3) topic无法删掉[root@hadoop221 bin]# ./kafka-topics.sh --delete  --zookeeper hadoop222:2181 --topic TESTTopic TEST is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.[root@hadoop221 bin]# ./kafka-topics.sh --list --zookeeper hadoop222:2181TEST - marked for deletionTEST_TTT - marked for deletion[root@hadoop221 bin]# 需要增加配置参数:delete.topic.enable=true
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: