一次kafka增加Partition流程

线上有一个topic碰到突增,但是限制于topic的分区太少,消费实在太慢,导致影响到用户的感受。

首先我们看下这个topic

1
2
3
4
5
6
7
8
9
10
11
12
./bin/kafka-topics.sh --zookeeper prod-zk1:2181 --describe --topic Events

Topic: Events Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: Events Partition: 1 Leader: 2 Replicas: 0,1,2 Isr: 2,0
Topic: Events Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0
Topic: Events Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,0
Topic: Events Partition: 4 Leader: 2 Replicas: 0,2,1 Isr: 2,1
Topic: Events Partition: 5 Leader: 0 Replicas: 1,0 Isr: 1,0
Topic: Events Partition: 6 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: Events Partition: 7 Leader: 2 Replicas: 0,1,2 Isr: 2,1
Topic: Events Partition: 8 Leader: 0 Replicas: 1,2,0 Isr: 1,0
Topic: Events Partition: 9 Leader: 1 Replicas: 2,1,0 Isr: 1,0

我们看到这个topic就10个分区,但是副本数却有些是2,有些是1。这种是否可以直接增加分区呢,答案是不行的。

你要这个时候增加分区就会报

1
requirement failed: All partitions should have the same number of replicas.

所以要先把这些分区都设置成一样的replica.

那就先写个json,把所有Partition设置成一样的replica

1
{"version":1,"partitions":[{"topic":"Events","partition":7,"replicas":[0,1,2]},{"topic":"Events","partition":0,"replicas":[2,1,0]},{"topic":"Events","partition":1,"replicas":[0,1,2]},{"topic":"Events","partition":2,"replicas":[1,2,0]},{"topic":"Events","partition":5,"replicas":[2,1,0]},{"topic":"Events","partition":6,"replicas":[2,0,1]},{"topic":"Events","partition":9,"replicas":[2,1,0]},{"topic":"Events","partition":4,"replicas":[0,2,1]},{"topic":"Events","partition":3,"replicas":[2,1,0]},{"topic":"Events","partition":8,"replicas":[1,2,0]}]}

如果topic的分区很多,那最好自动生成这个,先我们写一个文件叫topic.json

1
2
3
4
5
6
{
"topics": [
{"topic": "Events"}
],
"version": 1
}

然后使用命令去生成迁移的json

1
./bin/kafka-reassign-partitions.sh --zookeeper prod-zk1:2181 --topics-to-move-json-file topic.json --broker-list "0,1,2" --generate

这样输出的结果复制到replication-factor.json里,自己有必要再进行手动调整。

1
./bin/kafka-reassign-partitions.sh --zookeeper prod-zk1:2181  --reassignment-json-file replication-factor.json --execute

结果又来了如下的错误

1
2
3
4
5
6
7
Partitions reassignment failed due to Partition reassignment currently in progress for Map([Events,1] -> ReassignedPartitionsContext(List(0, 1),null), [Events,8] -> ReassignedPartitionsContext(List(1, 2),null), [Events,4] -> ReassignedPartitionsContext(List(0, 2),null), [Events,2] -> ReassignedPartitionsContext(List(1, 2),null), [Events,9] -> ReassignedPartitionsContext(List(2, 1),null), [Events,7] -> ReassignedPartitionsContext(List(0, 1),null), [Events,3] -> ReassignedPartitionsContext(List(2, 1),null)). Aborting operation
kafka.common.AdminCommandFailedException: Partition reassignment currently in progress for Map([Events,1] -> ReassignedPartitionsContext(List(0, 1),null), [Events,8] -> ReassignedPartitionsContext(List(1, 2),null), [Events,4] -> ReassignedPartitionsContext(List(0, 2),null), [Events,2] -> ReassignedPartitionsContext(List(1, 2),null), [Events,9] -> ReassignedPartitionsContext(List(2, 1),null), [Events,7] -> ReassignedPartitionsContext(List(0, 1),null), [Events,3] -> ReassignedPartitionsContext(List(2, 1),null)). Aborting operation
at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:243)
at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:154)
at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:128)
at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:52)
at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)

而去kafka broker里能发现如下的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PartitionFetchInfo(674777,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 2 failed to record follower 0's position -1 since the replica is not recognized to be one of the assigned replicas 1,2 for partition
[Events,4].
at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:251)
at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:864)
at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:861)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:861)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:470)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:496)
at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
[2020-02-XX YY:06:26,510] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([[Events,8], initOffset 0 to broker BrokerEndPoint(0,prod-kafka1,9092)] ) (kafka.server.ReplicaFetcherManager)
[2020-02-XX YY:06:26,590] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [Events,4] (kafka.server.ReplicaFetcherManager)
[2020-02-XX YY:06:26,693] WARN [ReplicaFetcherThread-0-0], Replica 2 for partition [Events,8] reset its fetch offset from 0 to current leader 0's start offset 364381305 (kafka.server.ReplicaFetcherThread)
[2020-02-XX YY:06:26,751] INFO Scheduling log segment 0 for log Events-8 for deletion. (kafka.log.Log)
[2020-02-XX YY:06:26,959] ERROR [ReplicaFetcherThread-0-0], Current offset 0 for partition [Events,8] out of range; reset offset to 364381305 (kafka.server.ReplicaFetcherThread)
[2020-02-XX YY:06:26,959] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [Events,0] (kafka.server.ReplicaFetcherManager)

我去,难道以前有人更新过啊。那就去zk里看下吧。果然还真有

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: prod-zk1(CONNECTED) 3] get /admin/reassign_partitions
{"version":1,"partitions":[{"topic":"Events","partition":1,"replicas":[0,1]},{"topic":"Events","partition":8,"replicas":[1,2]},{"topic":"Events","partition":4,"replicas":[0,2]},{"topic":"Events","partition":2,"replicas":[1,2]},{"topic":"Events","partition":7,"replicas":[0,1]},{"topic":"Events","partition":3,"replicas":[2,1]}]}
cZxid = 0x1f0c8babde
ctime = Sat Feb XX YY:06:24 CST 2020
mZxid = 0x1f0c98c317
mtime = Sat Feb XX ZZ:50:33 CST 2020
pZxid = 0x1f0c8babde
cversion = 0
dataVersion = 4
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 394
numChildren = 0

那就直接删除了吧

1
[zk: prod-zk1(CONNECTED) 4] rmr /admin/reassign_partitions

再次执行就了

1
2
./bin/kafka-reassign-partitions.sh --zookeeper prod-zk1:2181  --reassignment-json-file replication-factor.json --execute
./bin/kafka-topics.sh --zookeeper prod-zk1:2181 --alter --topic Events --partitions 100

参考:
https://blog.csdn.net/huanggang028/article/details/49445569
https://stackoverflow.com/questions/51107120/partitions-reassignment-is-failing-in-kafka-1-1-0