更新时间:2024-07-24 GMT+08:00
均衡Kafka扩容节点后数据
前提条件
- MRS集群管理员已明确业务需求,并准备一个Kafka管理员用户(属于kafkaadmin组,普通模式不需要)。
- 已安装Kafka客户端,客户端安装目录如“/opt/client”。
- 本示例需创建两个Topic,可参考7,分别命名为“test_2”和“test_3”,并创建“move-kafka-topic.json”文件,创建路径如“/opt/client/Kafka/kafka”,Topic格式内容如下:
{ "topics": [{"topic":"test_2"},{"topic":"test_3"}], "version":1 }
操作步骤
- 以客户端安装用户,登录安装Kafka客户端的节点。
- 切换到Kafka客户端安装目录。
cd /opt/client
- 执行以下命令,配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(普通模式跳过此步骤)
kinit 组件业务用户
- 执行以下命令进入Kafka客户端的bin目录。
cd Kafka/kafka/bin
- 执行以下命令生成执行计划。
./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --topics-to-move-json-file ../move-kafka-topic.json --broker-list "1,2,3" --generate
- 172.16.0.119:ZooKeeper实例的业务IP。
- --broker-list "1,2,3":参数中的“1,2,3”为扩容后的所有broker_id。
- 执行vim ../reassignment.json创建“reassignment.json”文件并保存,保存路径为“/opt/kafkaclient/Kafka/kafka”。
拷贝6中生成的“Proposed partition reassignment configuration”下的内容至“reassignment.json”文件,如下所示:
{"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]}
- 执行以下命令进行分区重分布。
./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --execute --throttle 50000000
--throttle 50000000:限制网络带宽为50MB。带宽可根据数据量大小及客户对均衡时间的要求进行调整,5TB数据量,使用50MB带宽,均衡时长约8小时。
- 执行以下命令查看迁移状态。
./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --verify
父主题: Kafka运维管理