基于binlog的MySQL数据同步到MRS集群中
本章节为您介绍使用Maxwell同步工具将线下基于binlog的数据迁移到MRS Kafka集群中的指导。
Maxwell是一个开源程序,通过读取MySQL的binlog日志,将增删改等操作转为JSON格式发送到输出端(如控制台/文件/Kafka等)。Maxwell可部署在MySQL机器上,也可独立部署在其他与MySQL网络可通的机器上。
Maxwell运行在Linux服务器上,常见的有EulerOS、Ubuntu、Debian、CentOS、OpenSUSE等,且需要Java 1.8+支持。
同步数据具体内容如下。
配置MySQL
- 开启binlog,在MySQL中打开my.cnf文件,在[mysqld] 区块检查是否配置server_id,log-bin与binlog_format,若没有配置请执行如下命令添加配置项并重启MySQL,若已经配置则忽略此步骤。
$ vi my.cnf [mysqld] server_id=1 log-bin=master binlog_format=row
- Maxwell需要连接MySQL,并创建一个名称为maxwell的数据库存储元数据,且需要能访问需要同步的数据库,所以建议新创建一个MySQL用户专门用来给Maxwell使用。使用root登录MySQL之后,执行如下命令创建maxwell用户(其中XXXXXX是密码,请修改为实际值)。
- 若Maxwell程序部署在非MySQL机器上,则创建的maxwell用户需要有远程登录数据库的权限,此时创建命令为
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
- 若Maxwell部署在MySQL机器上,则创建的maxwell用户可以设置为只能在本机登录数据库,此时创建命令为
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'localhost' identified by 'XXXXXX';
mysql> GRANT ALL on maxwell.* to 'maxwell'@'localhost';
- 若Maxwell程序部署在非MySQL机器上,则创建的maxwell用户需要有远程登录数据库的权限,此时创建命令为
安装Maxwell
- 下载安装包,下载路径为https://github.com/zendesk/maxwell/releases,选择名为maxwell-XXX.tar.gz的二进制文件下载,其中XXX为版本号。
- 将tar.gz包上传到任意目录下(本示例路径为Master节点的/opt)。
- 登录部署Maxwell的服务器,并执行如下命令进入tar.gz包所在目录。
cd /opt
- 执行如下命令解压“maxwell-XXX.tar.gz”压缩包,并进入“maxwell-XXX”文件夹。
tar -zxvf maxwell-XXX.tar.gz
cd maxwell-XXX
配置Maxwell
在maxwell-XXX文件夹下若有conf目录则配置config.properties文件,配置项说明请参见表1。若没有conf目录,则是在maxwell-XXX文件夹下将config.properties.example修改成config.properties。
配置项 |
是否必填 |
说明 |
默认值 |
---|---|---|---|
user |
是 |
连接MySQL的用户名,即2中新创建的用户 |
- |
password |
是 |
连接MySQL的密码 |
- |
host |
否 |
MySQL地址 |
localhost |
port |
否 |
MySQL端口 |
3306 |
log_level |
否 |
日志打印级别,可选值为
|
info |
output_ddl |
否 |
是否发送DDL(数据库与数据表的定义修改)事件
|
false |
producer |
是 |
生产者类型,配置为kafka
|
stdout |
producer_partition_by |
否 |
分区策略,用来确保相同一类的数据写入到kafka同一分区
|
database |
ignore_producer_error |
否 |
是否忽略生产者发送数据失败的错误
|
true |
metrics_slf4j_interval |
否 |
在日志中输出上传kafka成功与失败数据的数量统计的时间间隔,单位为秒 |
60 |
kafka.bootstrap.servers |
是 |
kafka代理节点地址,配置形式为HOST:PORT[,HOST:PORT] |
- |
kafka_topic |
否 |
写入kafka的topic名称 |
maxwell |
dead_letter_topic |
否 |
当发送某条记录出错时,记录该条出错记录主键的kafka topic |
- |
kafka_version |
否 |
Maxwell使用的kafka producer版本号,不能在config.properties中配置,需要在启动命令时用-- kafka_version xxx参数传入 |
- |
kafka_partition_hash |
否 |
划分kafka topic partition的算法,支持default或murmur3 |
default |
kafka_key_format |
否 |
Kafka record的key生成方式,支持array或Hash |
Hash |
ddl_kafka_topic |
否 |
当output_ddl配置为true时,DDL操作写入的topic |
{kafka_topic} |
filter |
否 |
过滤数据库或表。 |
- |
启动Maxwell
- 登录Maxwell所在的服务器。
- 执行如下命令进入Maxwell安装目录。
cd /opt/maxwell-1.21.0/
如果是初次使用Maxwell,建议将conf/config.properties中的log_level改为debug(调试级别),以便观察启动之后是否能正常从MySQL获取数据并发送到kafka,当整个流程调试通过之后,再把log_level修改为info,然后先停止再启动Maxwell生效。
# log level [debug | info | warn | error]
log_level=debug
- 执行如下命令启动Maxwell。
source /opt/client/bigdata_env
bin/Maxwell
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=kafkahost:9092 --kafka_topic=Maxwell
其中,user,password和host分别表示MySQL的用户名,密码和IP地址,这三个参数可以通过修改配置项配置也可以通过上述命令配置,kafkahost为流式集群的Core节点的IP地址。
显示类似如下信息,表示Maxwell启动成功。
Success to start Maxwell [78092].
验证Maxwell
- 登录Maxwell所在的服务器。
- 查看日志。如果日志里面没有ERROR日志,且有打印如下日志,表示与MySQL连接正常。
BinlogConnectorLifecycleListener - Binlog connected.
- 登录MySQL数据库,对测试数据进行更新/创建/删除等操作。操作语句可以参考如下示例。
-- 创建库 create database test; -- 创建表 create table test.e ( id int(10) not null primary key auto_increment, m double, c timestamp(6), comment varchar(255) charset 'latin1' ); -- 增加记录 insert into test.e set m = 4.2341, c = now(3), comment = 'I am a creature of light.'; -- 更新记录 update test.e set m = 5.444, c = now(3) where id = 1; -- 删除记录 delete from test.e where id = 1; -- 修改表 alter table test.e add column torvalds bigint unsigned after m; -- 删除表 drop table test.e; -- 删除库 drop database test;
- 观察Maxwell的日志输出,如果没有WARN/ERROR打印,则表示Maxwell安装配置正常。
若要确定数据是否成功上传,可设置config.properties中的log_level为debug,则数据上传成功时会立刻打印如下JSON格式数据,具体字段含义请参考Maxwell生成的数据格式及常见字段含义。
{"database":"test","table":"e","type":"insert","ts":1541150929,"xid":60556,"commit":true,"data":{"id":1,"m":4.2341,"c":"2018-11-02 09:28:49.297000","comment":"I am a creature of light."}} ……
当整个流程调试通过之后,可以把config.properties文件中的配置项log_level修改为info,减少日志打印量,并重启Maxwell。
# log level [debug | info | warn | error] log_level=info
停止Maxwell
- 登录Maxwell所在的服务器。
- 执行如下命令,获取Maxwell的进程标识(PID)。输出的第二个字段即为PID。
ps -ef | grep Maxwell | grep -v grep
- 执行如下命令,强制停止Maxwell进程。
kill -9 PID
Maxwell生成的数据格式及常见字段含义
Maxwell生成的数据格式为JSON,常见字段含义如下:
- type:操作类型,包含database-create,database-drop,table-create,table-drop,table-alter,insert,update,delete
- database:操作的数据库名称
- ts:操作时间,13位时间戳
- table:操作的表名
- data:数据增加/删除/修改之后的内容
- old:数据修改前的内容或者表修改前的结构定义
- sql:DDL操作的SQL语句
- def:表创建与表修改的结构定义
- xid:事务唯一ID
- commit:数据增加/删除/修改操作是否已提交