迁移Kafka节点内数据
操作场景
用户可以根据业务需求,通过Kafka客户端命令,在不停止服务的情况下,进行节点内磁盘间的分区数据迁移。也可以通过KafkaUI进行分区迁移。
前提条件
- MRS集群管理员已明确业务需求,并准备一个Kafka用户(属于kafkaadmin组,普通模式不需要)。
创建用户相关操作请参考创建Kafka用户并绑定角色。
- 已安装客户端,例如安装目录为“/opt/client”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
下载并安装集群客户端的具体操作,请参考安装MRS集群客户端。
- Kafka实例状态和磁盘状态均正常。
- 根据待迁移分区当前的磁盘空间占用情况,评估迁移后,不会导致新迁移后的磁盘空间不足。
使用Kafka客户端迁移数据(MRS 3.6.0及之前版本)
- 登录MRS集群Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 获取Kafka节点业务IP及端口。
- 选择“集群 > 服务 > Kafka > 实例”,查看并记录任意一个Broker角色实例的业务IP地址。
- 选择“配置 > 全部配置”,并根据集群模式获取端口号:
- 集群已启用Kerberos认证(安全模式):搜索“sasl.port”参数,查看并记录端口号,默认为21007。
- 集群未启用Kerberos认证(普通模式):搜索“port”参数,查看并记录端口号,默认为9092。
- 获取ZooKeeper节点业务IP及端口。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client”,具体以实际替换。
cd /opt/client - 执行以下命令配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)时跳过此步骤)
kinit 组件业务用户 - 执行以下命令进入Kafka客户端“bin”目录。
cd Kafka/kafka/bin
- 执行以下命令,查看待迁移的Partition对应的Topic的详细信息。
- 集群已启用Kerberos认证(安全模式)时,执行以下命令:
./kafka-topics.sh --describe --bootstrap-server Kafka集群IP:21007 --command-config ../config/client.properties --topic 主题名称
- 集群未启用Kerberos认证(普通模式)时,执行以下命令:
./kafka-topics.sh --describe --bootstrap-server Kafka集群IP:9092 --command-config ../config/client.properties --topic 主题名称
例如执行后结果如下:

- 集群已启用Kerberos认证(安全模式)时,执行以下命令:
- 执行以下命令,查询Broker_ID和IP对应关系。
./kafka-broker-info.sh --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka例如,执行后结果如下:
Broker_ID IP_Address -------------------------- 4 192.168.0.100 5 192.168.0.101 6 192.168.0.102
- 从9和10回显中获取分区的分布信息和节点信息,在当前目录下创建执行重新分配的json文件。 以迁移的是Broker_ID为6的节点的分区为例,迁移到“/srv/BigData/hadoop/data1/kafka-logs”,完成迁移所需的json配置文件,内容如下。
{"partitions":[{"topic": "testws","partition": 2,"replicas": [6,5],"log_dirs": ["/srv/BigData/hadoop/data1/kafka-logs","any"]}],"version":1}- topic:为Topic名称,此处以testws为例,具体以实际为准。
- partition:为Topic分区。
- replicas:其中的数字对应Broker_ID。replicas必须与分区的副本数相对应,否则会造成副本缺少的情况。在本案例中分区所在的replicas对应6和5,只迁移Broker_ID为6的节点的分区中的数据时,也必须把Broker_ID为5的节点的分区带上。
- log_dirs:为需要迁移的磁盘路径。此样例迁移的是Broker_ID为6的节点,Broker_ID为5的节点对应的log_dirs可设置为“any”,Broker_ID为6的节点对应的log_dirs设置为“/srv/BigData/hadoop/data1/kafka-logs”。注意路径需与节点对应。
- 使用如下命令,执行重分配操作。
其中,json文件路径为11中编写的文件。
- 集群已启用Kerberos认证(安全模式)时,执行以下命令:
./kafka-reassign-partitions.sh --bootstrap-server Broker业务IP:21007 --command-config ../config/client.properties --reassignment-json-file json文件路径 --execute
- 集群未启用Kerberos认证(普通模式)时,执行以下命令:
./kafka-reassign-partitions.sh --bootstrap-server Broker业务IP:9092 --command-config ../config/client.properties --reassignment-json-file json文件路径 --execute
提示“Successfully started reassignment of partitions”表示执行成功。
- 集群已启用Kerberos认证(安全模式)时,执行以下命令:
使用Kafka客户端迁移数据(MRS 3.6.0.1及之后版本)
- 登录MRS集群Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 获取Kafka节点业务IP及端口。
- 选择“集群 > 服务 > Kafka > 实例”,查看并记录任意一个Broker角色实例的业务IP地址。
- 选择“配置 > 全部配置”,并根据集群模式获取端口号:
- 集群已启用Kerberos认证(安全模式):搜索“sasl.port”参数,查看并记录端口号,默认为21007。
- 集群未启用Kerberos认证(普通模式):搜索“port”参数,查看并记录端口号,默认为9092。
- 获取ZooKeeper节点业务IP及端口。
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client”,具体以实际替换。
cd /opt/client - 执行以下命令配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)时跳过此步骤)
kinit 组件业务用户 - 执行以下命令进入Kafka客户端“bin”目录。
cd Kafka/kafka/bin
- 执行以下命令,查询Broker_ID和IP对应关系。
./kafka-broker-info.sh --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka
Broker_ID IP_Address -------------------------- 4 192.168.0.100 5 192.168.0.101 6 192.168.0.102
- 根据已获取的Broker_ID进行对应节点内的数据迁移。
使用如下命令,执行节点内数据迁移操作。
- 安全模式:
./kafka-move-partition-replica.sh --bootstrap-server Broker业务IP:21007 --topic-partition 主题名称:分区 --broker-id Broker_ID --target-log-dir 目标迁移磁盘地址 --command-config ../config/client.properties
- 普通模式:
./kafka-move-partition-replica.sh --bootstrap-server Broker业务IP:21005 --topic-partition 主题名称:分区 --broker-id Broker_ID --target-log-dir 目标迁移磁盘地址 --command-config ../config/client.properties
命令执行后提示“Successfully moved replica 主题名称:分区 to log directory 目标迁移磁盘地址”表示执行成功。
- 安全模式:
使用KafkaUI迁移分区(MRS 3.5.0之前版本)
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 选择“集群 > 服务 > Kafka”。
- 在“KafkaManager Web UI”右侧,单击URL链接,进入KafkaUI的页面。
- 单击“Generate assignment”进入分区迁移页面。 图1 分区迁移页面(MRS 3.5.0之前版本)
- 在“Brokers”处选择要将主题重新分配的Broker ID。
- 单击“Generate Partition Assignments”生成分区迁移方案。
- 继续单击“Run assignment”执行分区迁移方案,完成分区迁移。
使用KafkaUI迁移分区(MRS 3.5.0及之后版本)
- 使用具有KafkaUI页面访问权限的用户登录FusionInsight Manager。
登录集群Manager具体操作,请参考访问MRS集群Manager。
- 选择“集群 > 服务 > Kafka”。
- 在“KafkaManager Web UI”右侧,单击URL链接,进入KafkaUI的页面。
- 单击“Generate assignment”进入分区迁移页面。 图2 分区迁移页面(MRS 3.5.0及之后版本)
- 在“Brokers”处选择要将主题重新分配的Broker ID。
- 在搜索栏中搜索需要进行分区迁移的Topic以及分区。
- Partition展示待迁移的Topic,其命名规则为“Topic名称-分区索引”。
- Replicas展示当前该Topic分区所在Broker的ID。
- 单击“Generate Partition Assignments”按钮,生成分区迁移计划。 图3 分区迁移计划
- 可以在“Throttle”配置分区迁移的限流参数,不配置则使用默认值,默认值为“0”,表示不限速。
- Target Replicas:为自动生成的分区迁移计划,支持手动修改,其规则为“BrokerId1,BrokerId2”。
- Operation:可以对不需要进行分区迁移的Topic进行删除。
- 单击“Run assignment”执行分区迁移计划。
- 单击“Current Reassign Status”可以查看当前分区迁移计划的执行情况。 图4 迁移计划执行情况
- 单击“Modify Reassignment Throttle”可以修改分区迁移的限流参数。
- 单击“Cancel”可以取消当前所有正在执行的分区迁移计划。
相关文档
- 关于Kafka UI页面详细介绍及操作,请参考登录KafkaUI界面。
- 如果需要增加Kafka Topic的分区,请参考增加Kafka Topic分区。