文档首页/ MapReduce服务 MRS/ 组件操作指南(安卡拉区域)/ 使用Kafka/ 基于binlog的MySQL数据同步到MRS集群中
更新时间:2024-11-29 GMT+08:00

基于binlog的MySQL数据同步到MRS集群中

本章节为您介绍使用Maxwell同步工具将线下基于binlog的数据迁移到MRS Kafka集群中的指导。

Maxwell是一个开源程序,通过读取MySQL的binlog日志,将增删改等操作转为JSON格式发送到输出端(如控制台/文件/Kafka等)。Maxwell可部署在MySQL机器上,也可独立部署在其他与MySQL网络可通的机器上。

Maxwell运行在Linux服务器上,常见的有EulerOS、Ubuntu、Debian、CentOS、OpenSUSE等,且需要Java 1.8+支持。

同步数据具体内容如下。

  1. 配置MySQL
  2. 安装Maxwell
  3. 配置Maxwell
  4. 启动Maxwell
  5. 验证Maxwell
  6. 停止Maxwell
  7. Maxwell生成的数据格式及常见字段含义

配置MySQL

  1. 开启binlog,在MySQL中打开my.cnf文件,在[mysqld] 区块检查是否配置server_id,log-bin与binlog_format,若没有配置请执行如下命令添加配置项并重启MySQL,若已经配置则忽略此步骤。

    $ vi my.cnf
    
    [mysqld]
    server_id=1
    log-bin=master
    binlog_format=row

  2. Maxwell需要连接MySQL,并创建一个名称为maxwell的数据库存储元数据,且需要能访问需要同步的数据库,所以建议新创建一个MySQL用户专门用来给Maxwell使用。使用root登录MySQL之后,执行如下命令创建maxwell用户(其中XXXXXX是密码,请修改为实际值)。

    • 若Maxwell程序部署在非MySQL机器上,则创建的maxwell用户需要有远程登录数据库的权限,此时创建命令为

      mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';

      mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

    • 若Maxwell部署在MySQL机器上,则创建的maxwell用户可以设置为只能在本机登录数据库,此时创建命令为

      mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'localhost' identified by 'XXXXXX';

      mysql> GRANT ALL on maxwell.* to 'maxwell'@'localhost';

安装Maxwell

  1. 下载安装包,下载路径为https://github.com/zendesk/maxwell/releases,选择名为maxwell-XXX.tar.gz的二进制文件下载,其中XXX为版本号。
  2. 将tar.gz包上传到任意目录下(本示例路径为Master节点的/opt)。
  3. 登录部署Maxwell的服务器,并执行如下命令进入tar.gz包所在目录。

    cd /opt

  4. 执行如下命令解压“maxwell-XXX.tar.gz”压缩包,并进入“maxwell-XXX”文件夹。

    tar -zxvf maxwell-XXX.tar.gz

    cd maxwell-XXX

配置Maxwell

在maxwell-XXX文件夹下若有conf目录则配置config.properties文件,配置项说明请参见表1。若没有conf目录,则是在maxwell-XXX文件夹下将config.properties.example修改成config.properties。

表1 Maxwell配置项说明

配置项

是否必填

说明

默认值

user

连接MySQL的用户名,即2中新创建的用户

-

password

连接MySQL的密码

-

host

MySQL地址

localhost

port

MySQL端口

3306

log_level

日志打印级别,可选值为

  • debug
  • info
  • warn
  • error

info

output_ddl

是否发送DDL(数据库与数据表的定义修改)事件

  • true:发送DDL事件
  • false:不发送DDL事件

false

producer

生产者类型,配置为kafka

  • stdout:将生成的事件打印在日志中
  • kafka:将生成的事件发送到kafka

stdout

producer_partition_by

分区策略,用来确保相同一类的数据写入到kafka同一分区

  • database:使用数据库名称做分区,保证同一个数据库的事件写入到kafka同一个分区中
  • table:使用表名称做分区,保证同一个表的事件写入到kafka同一个分区中

database

ignore_producer_error

是否忽略生产者发送数据失败的错误

  • true:在日志中打印错误信息并跳过错误的数据,程序继续运行
  • false:在日志中打印错误信息并终止程序

true

metrics_slf4j_interval

在日志中输出上传kafka成功与失败数据的数量统计的时间间隔,单位为秒

60

kafka.bootstrap.servers

kafka代理节点地址,配置形式为HOST:PORT[,HOST:PORT]

-

kafka_topic

写入kafka的topic名称

maxwell

dead_letter_topic

当发送某条记录出错时,记录该条出错记录主键的kafka topic

-

kafka_version

Maxwell使用的kafka producer版本号,不能在config.properties中配置,需要在启动命令时用-- kafka_version xxx参数传入

-

kafka_partition_hash

划分kafka topic partition的算法,支持default或murmur3

default

kafka_key_format

Kafka record的key生成方式,支持array或Hash

Hash

ddl_kafka_topic

当output_ddl配置为true时,DDL操作写入的topic

{kafka_topic}

filter

过滤数据库或表。

  • 若只想采集mydatabase的库,可以配置为

    exclude: *.*,include: mydatabase.*

  • 若只想采集mydatabase.mytable的表,可以配置为

    exclude: *.*,include: mydatabase.mytable

  • 若只想采集mydatabase库下的mytable,mydate_123, mydate_456表,可以配置为

    exclude: *.*,include: mydatabase.mytable, include: mydatabase./mydate_\\d*/

-

启动Maxwell

  1. 登录Maxwell所在的服务器。
  2. 执行如下命令进入Maxwell安装目录。

    cd /opt/maxwell-1.21.0/

    如果是初次使用Maxwell,建议将conf/config.properties中的log_level改为debug(调试级别),以便观察启动之后是否能正常从MySQL获取数据并发送到kafka,当整个流程调试通过之后,再把log_level修改为info,然后先停止再启动Maxwell生效。

    # log level [debug | info | warn | error]

    log_level=debug

  3. 执行如下命令启动Maxwell。

    source /opt/client/bigdata_env

    bin/Maxwell

    bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \

    --producer=kafka --kafka.bootstrap.servers=kafkahost:9092 --kafka_topic=Maxwell

    其中,user,password和host分别表示MySQL的用户名,密码和IP地址,这三个参数可以通过修改配置项配置也可以通过上述命令配置,kafkahost为流式集群的Core节点的IP地址。

    显示类似如下信息,表示Maxwell启动成功。

    Success to start Maxwell [78092].

验证Maxwell

  1. 登录Maxwell所在的服务器。
  2. 查看日志。如果日志里面没有ERROR日志,且有打印如下日志,表示与MySQL连接正常。

    BinlogConnectorLifecycleListener - Binlog connected.

  3. 登录MySQL数据库,对测试数据进行更新/创建/删除等操作。操作语句可以参考如下示例。

    -- 创建库
    create database test;
    -- 创建表
    create table test.e (
      id int(10) not null primary key auto_increment,
      m double,
      c timestamp(6),
      comment varchar(255) charset 'latin1'
    );
    -- 增加记录
    insert into test.e set m = 4.2341, c = now(3), comment = 'I am a creature of light.';
    -- 更新记录
    update test.e set m = 5.444, c = now(3) where id = 1;
    -- 删除记录
    delete from test.e where id = 1;
    -- 修改表
    alter table test.e add column torvalds bigint unsigned after m;
    -- 删除表
    drop table test.e;
    -- 删除库
    drop database test;

  4. 观察Maxwell的日志输出,如果没有WARN/ERROR打印,则表示Maxwell安装配置正常。

    若要确定数据是否成功上传,可设置config.properties中的log_level为debug,则数据上传成功时会立刻打印如下JSON格式数据,具体字段含义请参考Maxwell生成的数据格式及常见字段含义
    {"database":"test","table":"e","type":"insert","ts":1541150929,"xid":60556,"commit":true,"data":{"id":1,"m":4.2341,"c":"2018-11-02 09:28:49.297000","comment":"I am a creature of light."}}
    ……

    当整个流程调试通过之后,可以把config.properties文件中的配置项log_level修改为info,减少日志打印量,并重启Maxwell。

    # log level [debug | info | warn | error]
    log_level=info

停止Maxwell

  1. 登录Maxwell所在的服务器。
  2. 执行如下命令,获取Maxwell的进程标识(PID)。输出的第二个字段即为PID。

    ps -ef | grep Maxwell | grep -v grep

  3. 执行如下命令,强制停止Maxwell进程。

    kill -9 PID

Maxwell生成的数据格式及常见字段含义

Maxwell生成的数据格式为JSON,常见字段含义如下:

  • type:操作类型,包含database-create,database-drop,table-create,table-drop,table-alter,insert,update,delete
  • database:操作的数据库名称
  • ts:操作时间,13位时间戳
  • table:操作的表名
  • data:数据增加/删除/修改之后的内容
  • old:数据修改前的内容或者表修改前的结构定义
  • sql:DDL操作的SQL语句
  • def:表创建与表修改的结构定义
  • xid:事务唯一ID
  • commit:数据增加/删除/修改操作是否已提交