Kafka节点内数据迁移
操作场景
该任务指导管理员根据业务需求,通过Kafka客户端命令,在不停止服务的情况下,进行节点内磁盘间的分区数据迁移。
前提条件
- MRS集群管理员已明确业务需求,并准备一个Kafka用户(属于kafkaadmin组,普通模式不需要)。
- 已安装Kafka客户端。
- Kafka实例状态和磁盘状态均正常。
- 根据待迁移分区当前的磁盘空间占用情况,评估迁移后,不会导致新迁移后的磁盘空间不足。
操作步骤
- 以客户端安装用户,登录已安装Kafka客户端的节点。
- 执行以下命令,切换到Kafka客户端安装目录,例如“/opt/kafkaclient”。
cd /opt/kafkaclient
- 执行以下命令,配置环境变量。
source bigdata_env
- 执行以下命令,进行用户认证(普通模式跳过此步骤)。
kinit 组件业务用户
- 执行以下命令,切换到Kafka客户端目录。
cd Kafka/kafka/bin
- 执行以下命令,查看待迁移的Partition对应的Topic的详细信息。
安全模式:
./kafka-topics.sh --describe --bootstrap-server Kafka集群IP:21007 --command-config ../config/client.properties --topic 主题名称
普通模式:
./kafka-topics.sh --describe --bootstrap-server Kafka集群IP:21005 --command-config ../config/client.properties --topic 主题名称
- 执行以下命令,查询Broker_ID和IP对应关系。
./kafka-broker-info.sh --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka
Broker_ID IP_Address -------------------------- 4 192.168.0.100 5 192.168.0.101 6 192.168.0.102
- 从6和7回显中获取分区的分布信息和节点信息,在当前目录下创建执行重新分配的json文件。
以迁移的是Broker_ID为6的节点的分区为例,迁移到"/srv/BigData/hadoop/data1/kafka-logs”,完成迁移所需的json配置文件,内容如下。
{"partitions":[{"topic": "testws","partition": 2,"replicas": [6,5],"log_dirs": ["/srv/BigData/hadoop/data1/kafka-logs","any"]}],"version":1}
- topic为Topic名称,此处以testws为例,具体以实际为准。
- partition为Topic分区。
- replicas中的数字对应Broker_ID。replicas必须与分区的副本数相对应,不然会造成副本缺少的情况。在本案例中分区所在的replicas对应6和5,只迁移Broker_ID为6的节点的分区中的数据时,也必须把Broker_ID为5的节点的分区带上。
- log_dirs为需要迁移的磁盘路径。此样例迁移的是Broker_ID为6的节点,Broker_ID为5的节点对应的log_dirs可设置为“any”,Broker_ID为6的节点对应的log_dirs设置为“/srv/BigData/hadoop/data1/kafka-logs”。注意路径需与节点对应。
- 使用如下命令,执行重分配操作。
安全模式:
./kafka-reassign-partitions.sh --bootstrap-server Broker业务IP:21007 --command-config ../config/client.properties --zookeeper {zk_host}:{port}/kafka --reassignment-json-file 8中编写的json文件路径 --execute
普通模式:
./kafka-reassign-partitions.sh --bootstrap-server Broker业务IP:21005 --command-config ../config/client.properties --zookeeper {zk_host}:{port}/kafka --reassignment-json-file 8中编写的json文件路径 --execute
提示“Successfully started reassignment of partitions”表示执行成功。