kafka常用运维命令
2015-11-03 16:06
453 查看
列出所有topic:
bin/kafka-topics.sh --zookeeper localhost:2181
--list
说明:其实就是去检查zk上节点的/brokers/topics子节点,打印出来
创建topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic order_ledger_check_complete_test --partitions 4 --replication-factor 3
说明:线上环境我们会将自动创建topic禁用掉,改为手动创建(auto.create.topics.enable=false),parttitions和replication-factor是两个必备选项,第一个参数是消费并行度的一个重要参数,第二个极大提高了topic的可用性.备份因子默认是1,相当于没有备份,注意其值不能大于broker个数,否则会报错。同时还可以指定topic级别的配置参数,这种特定的配置会覆盖掉默认配置,并且存储在zookeeper的/config/topics/[topic_name]节点数据里。--alter
--config --deleteConfig
删除topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic payment_completed --delete
说明:在0.8.2.1之前的版本一般不建议之行删除操作,因为有各种各样的bug存在,目前的版本稳定些,同时我们需要将配置参数打开(delete.topic.enable=true),删除操作其实是通过更改一个zk节点,由另外的删除线程异步做的topicdeletionmanager。
增加partitions:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
说明:只能增加,不能减少。如果原有分散策略是hash的方式,将会受影响。发送端(默认10分钟会刷新本地存储元信息)和消费端都无需重启即可生效。
增加broker:
需要将安装包拷贝到对应服务器上,修改broker.id,不能与现有系统中broker id冲突,然后创建好对应的日志目录和数据目录等。注意的是,现有partition会继续保持到其他broker上面,新创建topic才可能分配到该新机器上面。如果需要保持整个kafka集群比较均衡,需要手动对现有数据进行迁移,尽量迁移非leader的partition,利用partition reassignment tools
1.--generate
也可以对候选的进行适当调整
[hadoop@i-o1oh kafka_2.10-0.8.2.1]$ cat topics-to-move.json
{"version":1,
"partitions":[{"topic":"production_process_flow_test"},
{"topic":"production_process_flow_dev"}
}
[hadoop@i-o1ohkafka_2.10-0.8.2.1]$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}
2.--execute
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]}]}
3.--verify
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --verify
Status of partition reassignment:
Reassignment of partition [production_process_flow_test,1] completed successfully
Reassignment of partition [production_process_flow_dev,3] completed successfully
Reassignment of partition [production_process_flow_test,3] completed successfully
Reassignment of partition [production_process_flow_dev,0] completed successfully
Reassignment of partition [production_process_flow_dev,1] completed successfully
Reassignment of partition [production_process_flow_test,0] completed successfully
Reassignment of partition [production_process_flow_test,2] completed successfully
Reassignment of partition [production_process_flow_dev,2] completed successfully
下线某台机器分两种情况:
1.数据完整:直接将数据目录拷贝到新机器上,同时保持kafka安装配置不变,启动起来就行
2.数据不完整:这种情况下比较麻烦,需要对所有topic进行describe,如果发现有replica在这台下线机器上,需要用partition reassignment tool进行迁移工作
增减replica:
只需要编辑 --reassignment-json-file,添加或者减少broker id即可。其他操作可以依赖partition reassignment tool 的--execute --verify.
消费消息:
watch --interval=2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic payment_completed_dev --from-beginning
查询消费信息:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group group_order_ledger --topic payment_completed_dev
写入消息性能测试:
bin/kafka-producer-perf-test.sh --broker-list kafka1.weibo.com:9092 --batch-size 1 --message-size 1024 --messages 10000 --sync --topics topic_test
bin/kafka-topics.sh --zookeeper localhost:2181
--list
说明:其实就是去检查zk上节点的/brokers/topics子节点,打印出来
创建topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic order_ledger_check_complete_test --partitions 4 --replication-factor 3
说明:线上环境我们会将自动创建topic禁用掉,改为手动创建(auto.create.topics.enable=false),parttitions和replication-factor是两个必备选项,第一个参数是消费并行度的一个重要参数,第二个极大提高了topic的可用性.备份因子默认是1,相当于没有备份,注意其值不能大于broker个数,否则会报错。同时还可以指定topic级别的配置参数,这种特定的配置会覆盖掉默认配置,并且存储在zookeeper的/config/topics/[topic_name]节点数据里。--alter
--config --deleteConfig
删除topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic payment_completed --delete
说明:在0.8.2.1之前的版本一般不建议之行删除操作,因为有各种各样的bug存在,目前的版本稳定些,同时我们需要将配置参数打开(delete.topic.enable=true),删除操作其实是通过更改一个zk节点,由另外的删除线程异步做的topicdeletionmanager。
增加partitions:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
说明:只能增加,不能减少。如果原有分散策略是hash的方式,将会受影响。发送端(默认10分钟会刷新本地存储元信息)和消费端都无需重启即可生效。
增加broker:
需要将安装包拷贝到对应服务器上,修改broker.id,不能与现有系统中broker id冲突,然后创建好对应的日志目录和数据目录等。注意的是,现有partition会继续保持到其他broker上面,新创建topic才可能分配到该新机器上面。如果需要保持整个kafka集群比较均衡,需要手动对现有数据进行迁移,尽量迁移非leader的partition,利用partition reassignment tools
1.--generate
也可以对候选的进行适当调整
[hadoop@i-o1oh kafka_2.10-0.8.2.1]$ cat topics-to-move.json
{"version":1,
"partitions":[{"topic":"production_process_flow_test"},
{"topic":"production_process_flow_dev"}
}
[hadoop@i-o1ohkafka_2.10-0.8.2.1]$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}
2.--execute
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]}]}
3.--verify
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --verify
Status of partition reassignment:
Reassignment of partition [production_process_flow_test,1] completed successfully
Reassignment of partition [production_process_flow_dev,3] completed successfully
Reassignment of partition [production_process_flow_test,3] completed successfully
Reassignment of partition [production_process_flow_dev,0] completed successfully
Reassignment of partition [production_process_flow_dev,1] completed successfully
Reassignment of partition [production_process_flow_test,0] completed successfully
Reassignment of partition [production_process_flow_test,2] completed successfully
Reassignment of partition [production_process_flow_dev,2] completed successfully
下线某台机器分两种情况:
1.数据完整:直接将数据目录拷贝到新机器上,同时保持kafka安装配置不变,启动起来就行
2.数据不完整:这种情况下比较麻烦,需要对所有topic进行describe,如果发现有replica在这台下线机器上,需要用partition reassignment tool进行迁移工作
增减replica:
只需要编辑 --reassignment-json-file,添加或者减少broker id即可。其他操作可以依赖partition reassignment tool 的--execute --verify.
消费消息:
watch --interval=2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic payment_completed_dev --from-beginning
查询消费信息:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group group_order_ledger --topic payment_completed_dev
写入消息性能测试:
bin/kafka-producer-perf-test.sh --broker-list kafka1.weibo.com:9092 --batch-size 1 --message-size 1024 --messages 10000 --sync --topics topic_test
相关文章推荐
- 在linux下新增一块硬盘的操作。(包含大于2T的硬盘在linux下挂载操作)
- CentOS中操作Psql
- linux中的文件锁flock
- linux查看java jdk安装路径和设置环境变量
- linux ftp上传文件用java代码实现
- Centos6.5使用Siege压力测试
- 机顶盒开发 笔记
- ecshop出现警告:Strict standards: Redefining already defined constructor for class
- Linux 主机root登陆以后远程SSH才能访问
- How to Setup OpenStack to use Local Disks for Instances
- linux---------远程debug
- Open MSDN document directly without Visual Studio
- 利用aop完成功能权限验证遇到的问题
- Tomcat应用与部署(二)
- windows和Linux系统使用java keytool工具生成cas单点登录数字证书
- powershell-脚本部署sharepoint解决方案
- pl-------------linux执行.pl文件
- 关于zepto没有scrollTop事件
- linux 上安装tomcat、进行相关设置、处理部分部署问题
- RedHat Enterprise Linux 4(32) matlab(32)的安装与c++混合编程