Fork me on GitHub

Kafka常用命令

kafka 资料整理

安装 zookeeper 配置 zookeeper 集群

启动 zookeeper: sudo bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}

检测 zookeeper 节点状态

1
2
3
4
5
6
bin/zkServer.sh status
echo srvr | nc 192.168.0.94 2181
echo srvr | nc 10.50.75.133 2181

# 连上 zookeeper,并访问目录
bin/zkCli.sh -server 192.168.0.94:2181

安装配置 kafka

编辑 ${KAFKA_HOME}/config/server.properties, 主要参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
host.name=10.50.75.133
advertised.host.name=10.50.75.133

port=9093
advertised.port=9093

broker.id=3

listeners=PLAINTEXT://10.50.75.133:9093

advertised.listeners=PLAINTEXT://10.50.75.133:9093

unclean.leader.election.enable=false
queued.max.requests=16
num.replica.fetchers=4
num.network.threads=8
num.io.threads=12

socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600

log.dirs=/data/kafka-logs2
num.partitions=8
log.flush.interval.messages=20000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=2000
log.retention.hours=48
log.retention.bytes=966367641600
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

zookeeper.connect=zs-dw-kafka-dn-1:2181,zs-dw-kafka-dn-2:2181,zs-dw-kafka-dn-3:2181/kafka_1.0
zookeeper.connection.timeout.ms=10000

group.initial.rebalance.delay.ms=3

default.replication.factor=2
auto.create.topics.enable=true

编辑 bin/kafka-server-start.sh

1
2
3
4

export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"

export LOG_DIR="/data/log/kafka_1.0"

启动 kafka

1
2
sudo -E bin/kafka-server-start.sh -daemon config/server.properties
# -E 命令表示将当前shell环境变量带入执行。 类似 su username 切换新用户保存了当前shell环境变量

启动完执行 bin/kafka-preferred-replica-election.sh –zookeeper 127.0.0.1:2181 来重新平衡 kafka 各分区的leader

关闭kafka

1
2
ps -ef|grep kafka
kill -s TERM $PIDS

kafka 命令

创建topic ‘dlog-test’ ,其中副本数位 2,分区数为 1

1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic dlog-test --partitions 1 --replication-factor 2

提示:

Topic dlog-test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

删除topic

1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic dlog-test2

修改 topic 配置,参考 3.2 Topic-Level Configs

  1. 查看 brokers 配置

    1
    bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
  2. 查看 topic 的配置

1
2
3
4
5
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics  --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name PlayerLogin --describe
  1. 增加 topic 配置
1
2
3
4
5
6
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=168

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name PlayerLogin --alter --add-config retention.ms=604800000

# 保留12小时
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name CustomEvent --alter --add-config retention.ms=43200000
  1. 删除 topic 配置
1
2
3
bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name PlayerLogin --alter --delete-config retention.ms
  1. 增加 topic 的 partition 分区数
1
2
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic CustomEvent --partitions 4

查看topic

1
2
3
4
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic PlayerLogin
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic CustomEvent

手动触发分区重新选举,类似 auto.leader.rebalance.enable=true 参数

1
bin/kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181

执行结果

1
2
Created preferred replica election path with {"version":1,"partitions":[{"topic":"MutableFlow","partition":2},{"topic":"TC_MailFlow","partition":0},{"topic":"RoundFlow","partition":2},{"topic":"TC_PlayerExpFlow","partition":0},{"topic":"SnsFlow","partition":1},{"topic":"TC_404Item","partition":0},...]}
Successfully started preferred replica election for partitions Set([TC_MailFlow,1], [__consumer_offsets,32], [LBS_DATA,2], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44],...)

查看当前 topic 最大 offsets

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname

最小offset

1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname

简单消费

1
2
3
4
5
bin/kafka-simple-consumer-shell.sh --broker-list "172.16.150.132:9093" --topic PlayerLogin

bin/kafka-simple-consumer-shell.sh -broker-list "192.168.0.94:9092,192.168.0.93:9092" --offset -1 --topic CustomEvent

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic PlayerLogin --from-beginning

查看消费详情

1
2
3
4
5
6
7
8
# 旧版kafka 消费组的 offset 保存在 zookeeper
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
# 新版kafka 消费组的 offset 保存在 kafka
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group dlog_rtc_group_PlayerLogin

bin/kafka-consumer-groups.sh --bootstrap-server 172.16.150.130:9093 --describe --group dlog_rtc_group_CustomEvent

kafka 平衡leader

1
2
3
bin/kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181

auto.leader.rebalance.enable=true

迁移数据,达到集群数据平衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4" --generate

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

# topic 分区重新分配,同时限速 50000000 B/s
bin/kafka-reassign-partitions.sh --zookeeper localhost:2185 --reassignment-json-file expand-cluster-reassignment.json --execute --throttle 50000000

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify |grep "still in progress"

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

对于数据量非常大的topic,可以临时将topic数据保存时间设置成很小,然后执行迁移任务,等迁移完再调回原来时间

中断迁移任务

一旦启动reassign 脚本,则无法停止迁移任务。如果需要强制停止,可以通过zookeeper 进行修改。

1
2
$ zookeeper-client -server 10.1.1.50:2181/kafka
[zk] delete /admin/reassign_partitions

链接:https://www.jianshu.com/p/626b147821cd

添加副本数也是一样道理

cat topics-to-move.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{"topics": [
{"topic": "TC_404Item"},
{"topic": "TC_CityWarFlow"},
{"topic": "TC_DepositFlow"},
{"topic": "TC_EquipFlow"},
{"topic": "TC_GuildFlow"},
{"topic": "TC_ItemFlow"},
{"topic": "TC_MailFlow"},
{"topic": "TC_MoneyFlow"},
{"topic": "TC_PlayerExpFlow"},
{"topic": "TC_PlayerRegister"}
],
"version":1
}

bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list “1,2,3” –generate

1
{"version":1,"partitions":[{"topic":"TC_GuildFlow","partition":1,"replicas":[3]},{"topic":"TC_PlayerExpFlow","partition":2,"replicas":[1]},{"topic":"TC_EquipFlow","partition":1,"replicas":[3]},{"topic":"TC_PlayerRegister","partition":0,"replicas":[3]},{"topic":"TC_MailFlow","partition":1,"replicas":[3]},{"topic":"TC_ItemFlow","partition":1,"replicas":[2]},{"topic":"TC_PlayerRegister","partition":2,"replicas":[2]},{"topic":"TC_GuildFlow","partition":2,"replicas":[1]},{"topic":"TC_MoneyFlow","partition":1,"replicas":[1]},{"topic":"TC_EquipFlow","partition":2,"replicas":[1]},{"topic":"TC_PlayerRegister","partition":1,"replicas":[1]},{"topic":"TC_PlayerExpFlow","partition":0,"replicas":[2]},{"topic":"TC_ItemFlow","partition":2,"replicas":[3]},{"topic":"TC_DepositFlow","partition":1,"replicas":[1]},{"topic":"TC_MoneyFlow","partition":2,"replicas":[2]},{"topic":"TC_MailFlow","partition":2,"replicas":[1]},{"topic":"TC_GuildFlow","partition":0,"replicas":[2]},{"topic":"TC_DepositFlow","partition":2,"replicas":[2]},{"topic":"TC_MoneyFlow","partition":0,"replicas":[3]},{"topic":"TC_PlayerExpFlow","partition":1,"replicas":[3]},{"topic":"TC_EquipFlow","partition":0,"replicas":[2]},{"topic":"TC_404Item","partition":2,"replicas":[3]},{"topic":"TC_DepositFlow","partition":0,"replicas":[3]},{"topic":"TC_MailFlow","partition":0,"replicas":[2]},{"topic":"TC_ItemFlow","partition":0,"replicas":[1]},{"topic":"TC_404Item","partition":0,"replicas":[1]},{"topic":"TC_404Item","partition":1,"replicas":[2]}]}

配置文件改成 expand-cluster-reassignment.json

1
{"version":1,"partitions":[{"topic":"TC_GuildFlow","partition":1,"replicas":[3,1]},{"topic":"TC_PlayerExpFlow","partition":2,"replicas":[1,2]},{"topic":"TC_EquipFlow","partition":1,"replicas":[3,1]},{"topic":"TC_PlayerRegister","partition":0,"replicas":[3,1]},{"topic":"TC_MailFlow","partition":1,"replicas":[3,1]},{"topic":"TC_ItemFlow","partition":1,"replicas":[2,3]},{"topic":"TC_PlayerRegister","partition":2,"replicas":[2,3]},{"topic":"TC_GuildFlow","partition":2,"replicas":[1,2]},{"topic":"TC_MoneyFlow","partition":1,"replicas":[1,2]},{"topic":"TC_EquipFlow","partition":2,"replicas":[1,2]},{"topic":"TC_PlayerRegister","partition":1,"replicas":[1,2]},{"topic":"TC_PlayerExpFlow","partition":0,"replicas":[2,3]},{"topic":"TC_ItemFlow","partition":2,"replicas":[3,1]},{"topic":"TC_DepositFlow","partition":1,"replicas":[1,2]},{"topic":"TC_MoneyFlow","partition":2,"replicas":[2,3]},{"topic":"TC_MailFlow","partition":2,"replicas":[1,2]},{"topic":"TC_GuildFlow","partition":0,"replicas":[2,3]},{"topic":"TC_DepositFlow","partition":2,"replicas":[2,3]},{"topic":"TC_MoneyFlow","partition":0,"replicas":[3,1]},{"topic":"TC_PlayerExpFlow","partition":1,"replicas":[3,1]},{"topic":"TC_EquipFlow","partition":0,"replicas":[2,3]},{"topic":"TC_404Item","partition":2,"replicas":[3,1]},{"topic":"TC_DepositFlow","partition":0,"replicas":[3,1]},{"topic":"TC_MailFlow","partition":0,"replicas":[2,3]},{"topic":"TC_ItemFlow","partition":0,"replicas":[1,2]},{"topic":"TC_404Item","partition":0,"replicas":[1,2]},{"topic":"TC_404Item","partition":1,"replicas":[2,3]}]}

执行

1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

验证

1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

kafka 生产消费速率控制, 限速

添加修改配置

1
bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=102400,consumer_byte_rate=2097152' --entity-type clients --entity-name client-test

查看配置

1
bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type clients

删除配置

1
bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --delete-config producer_byte_rate,consumer_byte_rate --entity-type clients --entity-name client-test

直接通过 zookeeper 删除配置

1
2
3
4
bin/zkCli.sh -server 127.0.0.1:2181
ls /config/clients
get /config/clients/client-test
delete /config/clients/client-test

-------------本文结束感谢您的阅读-------------
坚持技术分享,您的支持将鼓励我继续创作!