kafka 资料整理
安装 zookeeper 配置 zookeeper 集群
启动 zookeeper: sudo bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
检测 zookeeper 节点状态
1 | bin/zkServer.sh status |
安装配置 kafka
编辑 ${KAFKA_HOME}/config/server.properties, 主要参数
1 | host.name=10.50.75.133 |
编辑 bin/kafka-server-start.sh
1 |
|
启动 kafka
1 | sudo -E bin/kafka-server-start.sh -daemon config/server.properties |
启动完执行 bin/kafka-preferred-replica-election.sh –zookeeper 127.0.0.1:2181 来重新平衡 kafka 各分区的leader
关闭kafka1
2ps -ef|grep kafka
kill -s TERM $PIDS
kafka 命令
创建topic ‘dlog-test’ ,其中副本数位 2,分区数为 11
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic dlog-test --partitions 1 --replication-factor 2
提示:
Topic dlog-test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
删除topic1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic dlog-test2
修改 topic 配置,参考 3.2 Topic-Level Configs
查看 brokers 配置
1
bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
查看 topic 的配置
1 | bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --describe |
- 增加 topic 配置
1 | bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=168 |
- 删除 topic 配置
1 | bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes |
- 增加 topic 的 partition 分区数
1 | > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40 |
查看topic
1 | bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list |
手动触发分区重新选举,类似 auto.leader.rebalance.enable=true 参数1
bin/kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181
执行结果1
2Created preferred replica election path with {"version":1,"partitions":[{"topic":"MutableFlow","partition":2},{"topic":"TC_MailFlow","partition":0},{"topic":"RoundFlow","partition":2},{"topic":"TC_PlayerExpFlow","partition":0},{"topic":"SnsFlow","partition":1},{"topic":"TC_404Item","partition":0},...]}
Successfully started preferred replica election for partitions Set([TC_MailFlow,1], [__consumer_offsets,32], [LBS_DATA,2], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44],...)
查看当前 topic 最大 offsets1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
最小offset1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
简单消费1
2
3
4
5bin/kafka-simple-consumer-shell.sh --broker-list "172.16.150.132:9093" --topic PlayerLogin
bin/kafka-simple-consumer-shell.sh -broker-list "192.168.0.94:9092,192.168.0.93:9092" --offset -1 --topic CustomEvent
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic PlayerLogin --from-beginning
查看消费详情1
2
3
4
5
6
7
8# 旧版kafka 消费组的 offset 保存在 zookeeper
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
# 新版kafka 消费组的 offset 保存在 kafka
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group dlog_rtc_group_PlayerLogin
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.150.130:9093 --describe --group dlog_rtc_group_CustomEvent
kafka 平衡leader1
2
3bin/kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181
或
auto.leader.rebalance.enable=true
迁移数据,达到集群数据平衡1
2
3
4
5
6
7
8
9
10
11
12
13
14bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4" --generate
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
# topic 分区重新分配,同时限速 50000000 B/s
bin/kafka-reassign-partitions.sh --zookeeper localhost:2185 --reassignment-json-file expand-cluster-reassignment.json --execute --throttle 50000000
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify |grep "still in progress"
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
对于数据量非常大的topic,可以临时将topic数据保存时间设置成很小,然后执行迁移任务,等迁移完再调回原来时间
中断迁移任务
一旦启动reassign 脚本,则无法停止迁移任务。如果需要强制停止,可以通过zookeeper 进行修改。
1 | $ zookeeper-client -server 10.1.1.50:2181/kafka |
链接:https://www.jianshu.com/p/626b147821cd
添加副本数也是一样道理
cat topics-to-move.json1
2
3
4
5
6
7
8
9
10
11
12
13
14{"topics": [
{"topic": "TC_404Item"},
{"topic": "TC_CityWarFlow"},
{"topic": "TC_DepositFlow"},
{"topic": "TC_EquipFlow"},
{"topic": "TC_GuildFlow"},
{"topic": "TC_ItemFlow"},
{"topic": "TC_MailFlow"},
{"topic": "TC_MoneyFlow"},
{"topic": "TC_PlayerExpFlow"},
{"topic": "TC_PlayerRegister"}
],
"version":1
}
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list “1,2,3” –generate
1 | {"version":1,"partitions":[{"topic":"TC_GuildFlow","partition":1,"replicas":[3]},{"topic":"TC_PlayerExpFlow","partition":2,"replicas":[1]},{"topic":"TC_EquipFlow","partition":1,"replicas":[3]},{"topic":"TC_PlayerRegister","partition":0,"replicas":[3]},{"topic":"TC_MailFlow","partition":1,"replicas":[3]},{"topic":"TC_ItemFlow","partition":1,"replicas":[2]},{"topic":"TC_PlayerRegister","partition":2,"replicas":[2]},{"topic":"TC_GuildFlow","partition":2,"replicas":[1]},{"topic":"TC_MoneyFlow","partition":1,"replicas":[1]},{"topic":"TC_EquipFlow","partition":2,"replicas":[1]},{"topic":"TC_PlayerRegister","partition":1,"replicas":[1]},{"topic":"TC_PlayerExpFlow","partition":0,"replicas":[2]},{"topic":"TC_ItemFlow","partition":2,"replicas":[3]},{"topic":"TC_DepositFlow","partition":1,"replicas":[1]},{"topic":"TC_MoneyFlow","partition":2,"replicas":[2]},{"topic":"TC_MailFlow","partition":2,"replicas":[1]},{"topic":"TC_GuildFlow","partition":0,"replicas":[2]},{"topic":"TC_DepositFlow","partition":2,"replicas":[2]},{"topic":"TC_MoneyFlow","partition":0,"replicas":[3]},{"topic":"TC_PlayerExpFlow","partition":1,"replicas":[3]},{"topic":"TC_EquipFlow","partition":0,"replicas":[2]},{"topic":"TC_404Item","partition":2,"replicas":[3]},{"topic":"TC_DepositFlow","partition":0,"replicas":[3]},{"topic":"TC_MailFlow","partition":0,"replicas":[2]},{"topic":"TC_ItemFlow","partition":0,"replicas":[1]},{"topic":"TC_404Item","partition":0,"replicas":[1]},{"topic":"TC_404Item","partition":1,"replicas":[2]}]} |
配置文件改成 expand-cluster-reassignment.json1
{"version":1,"partitions":[{"topic":"TC_GuildFlow","partition":1,"replicas":[3,1]},{"topic":"TC_PlayerExpFlow","partition":2,"replicas":[1,2]},{"topic":"TC_EquipFlow","partition":1,"replicas":[3,1]},{"topic":"TC_PlayerRegister","partition":0,"replicas":[3,1]},{"topic":"TC_MailFlow","partition":1,"replicas":[3,1]},{"topic":"TC_ItemFlow","partition":1,"replicas":[2,3]},{"topic":"TC_PlayerRegister","partition":2,"replicas":[2,3]},{"topic":"TC_GuildFlow","partition":2,"replicas":[1,2]},{"topic":"TC_MoneyFlow","partition":1,"replicas":[1,2]},{"topic":"TC_EquipFlow","partition":2,"replicas":[1,2]},{"topic":"TC_PlayerRegister","partition":1,"replicas":[1,2]},{"topic":"TC_PlayerExpFlow","partition":0,"replicas":[2,3]},{"topic":"TC_ItemFlow","partition":2,"replicas":[3,1]},{"topic":"TC_DepositFlow","partition":1,"replicas":[1,2]},{"topic":"TC_MoneyFlow","partition":2,"replicas":[2,3]},{"topic":"TC_MailFlow","partition":2,"replicas":[1,2]},{"topic":"TC_GuildFlow","partition":0,"replicas":[2,3]},{"topic":"TC_DepositFlow","partition":2,"replicas":[2,3]},{"topic":"TC_MoneyFlow","partition":0,"replicas":[3,1]},{"topic":"TC_PlayerExpFlow","partition":1,"replicas":[3,1]},{"topic":"TC_EquipFlow","partition":0,"replicas":[2,3]},{"topic":"TC_404Item","partition":2,"replicas":[3,1]},{"topic":"TC_DepositFlow","partition":0,"replicas":[3,1]},{"topic":"TC_MailFlow","partition":0,"replicas":[2,3]},{"topic":"TC_ItemFlow","partition":0,"replicas":[1,2]},{"topic":"TC_404Item","partition":0,"replicas":[1,2]},{"topic":"TC_404Item","partition":1,"replicas":[2,3]}]}
执行1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
验证1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
kafka 生产消费速率控制, 限速
添加修改配置1
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=102400,consumer_byte_rate=2097152' --entity-type clients --entity-name client-test
查看配置1
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients
删除配置1
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config producer_byte_rate,consumer_byte_rate --entity-type clients --entity-name client-test
直接通过 zookeeper 删除配置1
2
3
4bin/zkCli.sh -server 127.0.0.1:2181
ls /config/clients
get /config/clients/client-test
delete /config/clients/client-test