KafKa动态扩容群集-(Topic.partitions迁移)

Posted by Yancy on 2018-01-11

kafka的扩容难点:

1)主要在于增加机器之后,数据需要rebalance到新增的空闲节点,即把partitions迁移到空闲机器上。
kafka提供bin/kafka-reassign-partitions.sh工具,完成parttition的迁移。

2)kafka的集群的数据量加大,数据rebalance的时间较长。解决办法是把log.retention.hours=1设置一小时(生产参数24小时)。
修改参数之后,重启kakfa节点,kafka会主动purge 1小时之前的log数据。
以下是kafka_0.8.1.1版本kafkka集群扩容操作记录,从3台物理机扩容到4台物理,partition数量由24个增加到28个。

参考:http://kafka.apache.org/081/documentation.html#basic_ops_modify_topic

kafka的复制副本:

将服务器添加到Kafka集群非常简单,只需为其分配唯一的代理ID,然后在新服务器上启动Kafka。但是,这些新的服务器不会自动分配任何数据分区,除非将分区移动到这些分区,否则直到创建新主题时才会执行任何工作。所以通常当你将机器添加到你的群集中时,你会想把一些现有的数据迁移到这些机器上。
数据迁移过程是手动启动的,但是完全自动化。下面介绍的是,Kafka会将新服务器添加为正在迁移的分区的跟随者,并允许其完全复制该分区中的现有数据。当新服务器完全复制了此分区的内容并加入了同步副本时,其中一个现有副本将删除其分区的数据。

分区重新分配工具可用于跨代理移动分区。理想的分区分布将确保所有代理的数据加载和分区大小。分区重新分配工具不具备自动研究Kafka集群中的数据分布并移动分区以实现均匀负载分配的功能。因此,管理员必须找出哪些主题或分区应该移动。

分区重新分配工具可以运行在3个互斥的模式中:

1
2
3
4
5
--生成:在此模式下,给定主题列表和经纪人列表,该工具会生成候选重新​​分配,以将指定主题的所有分区移至新经纪人。此选项仅提供了一种便捷的方式,可以根据主题和目标代理列表生成分区重新分配计划。
--execute:在此模式下,该工具根据用户提供的重新分配计划启动分区重新分配。(使用--reassignment-json-file选项)。这可以是由管理员制作的自定义重新分配计划,也可以是使用--generate选项提供的自定义重新分配计划。
--verify:在此模式下,该工具会验证上次执行过程中列出的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行

自动将数据迁移到新机器

分区重新分配工具可用于将当前一组经纪人的一些主题移到新增的经纪人。这在扩展现有集群时通常很有用,因为将整个主题移动到新的代理集比移动一个分区更容易。用于这样做时,用户应该提供应该移动到新的经纪人集合和新的经纪人的目标列表的主题列表。然后,该工具在新的代理集合上均匀分配给定主题列表的所有分区。在此过程中,主题的复制因子保持不变。有效地,主题输入列表的所有分区副本都从旧的代理集合移动到新添加的代理。
例如,以下示例将把主题countly_apppush,countly_event...的所有分区移动到新的代理集4 。在本次移动结束时,主题countly_apppush,countly_event...的所有分区将仅存在于代理4上。

1.kafka 扩容

首先按照搭建步骤,在其他机器上搭建集群,kafka的配置文件中 zkconnect 要保持与原kafka一致
kafka版本一致,配置跟之前kafka集群一致,只需要修改本地kafka的地址.

2.验证kafka新节点是否加入集群成功,这个应该去zookeeper的zkCli.sh 去查看

1
2
3
[root@kafka1 bin]# ./zkCli.sh -server 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281
[zk: 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281(CONNECTED) 0] ls /brokers/ids/
1,2,3,4
3.由于该工具接受主题的输入列表作为json文件,因此首先需要确定要移动的主题并创建json文件,如下所示:
1
2
3
4
5
6
7
8
9
10
11
{"topics": [{"topic": "countly_apppush"},
{"topic": "countly_event"},
{"topic": "countly_imp"},
{"topic": "countly_metrics"},
{"topic": "countly_pv"},
{"topic": "countly_session"},
{"topic": "mongotail_lz4"},
{"topic": "mongotail_lz4_imp"}
],
"version":1
}
4.一旦json文件准备就绪,使用分区重新分配工具来生成候选分配:
1
2
3
4
5
6
7
8
9
10
11
12
13
bin/kafka-reassign-partitions.sh --zookeeper 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4" --generate
[jollybi@kafka3 kafka_2.10-0.9.0.1]$ bin/kafka-reassign-partitions.sh --zookeeper 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"countly_metrics","partition":7,"replicas":[1,3,2]},{"topic":"countly_event","partition":6,"replicas":[1]},{"topic":"countly_pv","partition":8,"replicas":[3,2,1]},{"topic":"countly_pv","partition":0,"replicas":[1,3,2]},{"topic":"countly_imp","partition":5,"replicas":[1,3,2]},{"topic":"countly_imp","partition":8,"replicas":[1,2,3]},{"topic":"countly_apppush","partition":0,"replicas":[2,3,1]},{"topic":"countly_session","partition":8,"replicas":[1,3,2]},{"topic":"countly_imp","partition":9,"replicas":[2,1,3]},{"topic":"countly_metrics","partition":2,"replicas":[2,1,3]},{"topic":"countly_session","partition":4,"replicas":[3,1,2]},{"topic":"countly_pv","partition":4,"replicas":[2,3,1]},{"topic":"countly_apppush","partition":6,"replicas":[2,3,1]},{"topic":"countly_imp","partition":11,"replicas":[1,3,2]},{"topic":"countly_pv","partition":3,"replicas":[1,2,3]},{"topic":"countly_metrics","partition":4,"replicas":[1,2,3]},{"topic":"countly_metrics","partition":8,"replicas":[2,1,3]},{"topic":"countly_imp","partition":10,"replicas":[3,2,1]},
.....
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"countly_event","partition":6,"replicas":[3]},{"topic":"countly_metrics","partition":7,"replicas":[4,2,3]},{"topic":"countly_pv","partition":8,"replicas":[2,1,3]},{"topic":"countly_pv","partition":0,"replicas":[2,3,4]},{"topic":"countly_imp","partition":5,"replicas":[3,4,1]},{"topic":"countly_imp","partition":8,"replicas":[2,4,1]},{"topic":"countly_apppush","partition":0,"replicas":[1,2,3]},{"topic":"countly_imp","partition":9,"replicas":[3,1,2]},{"topic":"countly_session","partition":8,"replicas":[1,2,3]},{"topic":"countly_metrics","partition":2,"replicas":[3,4,1]},{"topic":"countly_session","partition":4,"replicas":[1,4,2]},{"topic":"countly_pv","partition":4,"replicas":[2,4,1]},{"topic":"countly_apppush","partition":6,"replicas":[3,1,2]},{"topic":"countly_imp","partition":11,"replicas":[1,3,4]},{"topic":"countly_pv","partition":3,"replicas":[1,2,3]}
......

其中的Current partition replica assignment指的是迁移前的partition replica;Proposed partition reassignment configuration 指的就是迁移分配规则json。需要将该[Proposed partition reassignment configuration]json文件保存到json文件中(如expand-cluster-reassignment.json)

该工具会生成一个候选分配,将所有分区从主题countly_apppush,countly_event...移动到brokers 1,2,3,4但是,请注意,在这一点上,分区运动还没有开始,它只是告诉你当前的任务和建议的新任务。应该保存当前的分配,以防你想要回滚到它。新的任务应该保存在一个json文件中(例如expand-cluster-reassignment.json),并用--execute选项输入到工具中,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
./bin/kafka-reassign-partitions.sh --zookeeper 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"mongotail_lz4_imp","partition":9,"replicas":[2,3,1]},{"topic":"mongotail_lz4_imp","partition":7,"replicas":[3,2,1]},{"topic":"mongotail_lz4_imp","partition":6,"replicas":[2,1,3]},{"topic":"mongotail_lz4_imp","partition":5,"replicas":[1,2,3]},{"topic":"mongotail_lz4_imp","partition":8,"replicas":[1,3,2]},{"topic":"mongotail_lz4_imp","partition":2,"replicas":[1,3,2]},{"topic":"mongotail_lz4_imp","partition":1,"replicas":[3,2,1]},{"topic":"mongotail_lz4_imp","partition":4,"replicas":[3,1,2]},{"topic":"mongotail_lz4_imp","partition":0,"replicas":[2,1,3]},{"topic":"mongotail_lz4_imp","partition":10,"replicas":[3,1,2]},{"topic":"mongotail_lz4_imp","partition":3,"replicas":[2,3,1]},{"topic":"mongotail_lz4_imp","partition":11,"replicas":[1,2,3]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"mongotail_lz4_imp","partition":3,"replicas":[3,4,1]},{"topic":"mongotail_lz4_imp","partition":8,"replicas":[4,3,1]},{"topic":"mongotail_lz4_imp","partition":0,"replicas":[4,1,2]},{"topic":"mongotail_lz4_imp","partition":6,"replicas":[2,4,1]},{"topic":"mongotail_lz4_imp","partition":11,"replicas":[3,2,4]},{"topic":"mongotail_lz4_imp","partition":1,"replicas":[1,2,3]},{"topic":"mongotail_lz4_imp","partition":10,"replicas":[2,1,3]},{"topic":"mongotail_lz4_imp","partition":4,"replicas":[4,2,3]},{"topic":"mongotail_lz4_imp","partition":9,"replicas":[1,4,2]},{"topic":"mongotail_lz4_imp","partition":2,"replicas":[2,3,4]},{"topic":"mongotail_lz4_imp","partition":5,"replicas":[1,3,4]},{"topic":"mongotail_lz4_imp","partition":7,"replicas":[3,1,2]}]}
```
最后,可以使用`--verify`选项来检查分区重新分配的状态。
请注意,相同的`expand-cluster-reassignment.json`(与`--execute`选项一起使用)应该与--verify选项一起使用
```BASH
/bin/kafka-reassign-partitions.sh --zookeeper 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281 --reassignment-json-file expand-cluster-reassignment.json --verify
[jollybi@kafka4 kafka_2.10-0.9.0.1]$ ./bin/kafka-reassign-partitions.sh --zookeeper 10.155.90.153:2281,10.155.90.155:2281,10.155.90.138:2281 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [countly_event,2] completed successfully
Reassignment of partition [countly_session,7] completed successfully
Reassignment of partition [countly_pv,5] completed successfully
Reassignment of partition [countly_apppush,1] completed successfully
Reassignment of partition [countly_event,0] completed successfully
Reassignment of partition [countly_session,10] completed successfully
Reassignment of partition [countly_apppush,4] completed successfully
Reassignment of partition [countly_event,7] is still in progress
Reassignment of partition [countly_metrics,7] completed successfully
Reassignment of partition [countly_imp,4] is still in progress
Reassignment of partition [countly_apppush,10] completed successfully
Reassignment of partition [countly_imp,5] is still in progress
.....

注意:在迁移过程中不能人为的结束或停止kafka服务,不然会有数据不一致的问题.

Communicative learning:

🐧 Linux shell_ senior operation and maintenance faction: QQ group 459096184 circle (system operation and maintenance - application operation and maintenance - automation operation and maintenance - virtualization technology research, welcome to join)
🐧 BigData-Exchange School:QQ group 521621407 circles (big data Yun Wei) (Hadoop developer) (big data research enthusiasts) welcome to join

Bidata have internal WeChat exchange group, learn from each other, join QQ group has links.