文档首页/ 云搜索服务 CSS/ 最佳实践/ Logstash数据处理与接入/ 使用Logstash将RDS MySQL数据同步至Elasticsearch
更新时间:2026-04-23 GMT+08:00
分享

使用Logstash将RDS MySQL数据同步至Elasticsearch

应用场景

CSS服务的Logstash集群默认安装了logstash-input-jdbc插件,该插件为Logstash提供了从关系型数据库RDS MySQL中导入和处理数据的能力,通过配置Logstash配置文件,定义JDBC输入和Elasticsearch输出,实现定期同步数据库中的数据到Elasticsearch。该方案可以用于以下场景:
  • 数据实时更新与同步:将RDS MySQL中的数据实时同步到Elasticsearch,以便利用Elasticsearch强大的搜索和分析能力。
  • 日志分析与检索:将MySQL中的日志数据同步到Elasticsearch,进行快速检索和分析。
  • 应用性能监控:将应用性能数据存储在MySQL中,通过Logstash同步到Elasticsearch,进行实时监控和性能分析。
  • 数据备份与恢复:通过Logstash将MySQL数据备份到Elasticsearch,以便在数据丢失或损坏时快速恢复。

方案架构

图1 RDS MySQL数据同步至Elasticsearch

使用Logstash将RDS MySQL数据同步至Elasticsearch的方案流程如图1所示。

在CSS服务的Logstash中通过默认插件logstash-input-jdbc,在Logstash配置文件中配置数据库JDBC输入和Elasticsearch输出,将全量或增量RDS MySQL数据实时同步至Elasticsearch。

方案优势

  • 灵活性:Logstash提供了数据采集、转换、优化和输出的能力,可以灵活地处理各种数据同步需求。
  • 实时性:Logstash可以实现数据的准实时同步,满足大多数业务场景的需求。
  • 易用性:通过Logstash配置文件即可实现数据同步,操作简单,无需复杂的代码开发。

约束限制

  • Elasticsearch中的_id字段必须与MySQL中的id字段相同。

    这是为了确保当MySQL中的记录写入Elasticsearch时,同步任务可在MySQL记录与Elasticsearch索引之间建立一个直接映射的关系。例如,当MySQL中更新了某条记录时,同步任务会覆盖Elasticsearch中与更新记录具有相同ID的索引。

  • 当MySQL中插入或者更新数据时,对应记录必须有一个包含更新或插入时间的字段。

    Logstash在每次轮询MySQL时,会记录最后读取记录的时间戳,并在下一次读取时只获取该时间戳之后更新或插入的符合条件的记录。

  • 确保MySQL数据库、Logstash集群和Elasticsearch集群在同一时区,否则当同步与时间相关的数据时,同步前后的数据可能存在时区差。

前提条件

  • 已准备好存有数据的MySQL数据库,本案例以云数据库的RDS for MySQL实例为例,具体操作请参见购买RDS for MySQL实例
  • 已准备好用于同步数据的Logstash集群,具体操作请参见创建Logstash集群。本文以7.10.0版本的Logstash集群为例。
  • 已准备好Elasticsearch集群,具体操作请参见创建Elasticsearch集群。本文以7.10.2版本的Elasticsearch集群为例。

以上三者在同一VPC下面。

当使用的是自建或第三方MySQL数据库时,则需要确认数据库驱动是否是MariaDB驱动。

操作步骤

  1. 验证Logstash集群和数据源之间的网络连通性。

    1. 进入配置中心页面。
      1. 登录云搜索服务管理控制台
      2. 在左侧导航栏,选择“集群管理 > Logstash”
      3. 在集群列表,单击目标集群名称,进入集群详情页。
      4. 选择“配置中心”页签。
    2. 在配置中心页面,单击“连通性测试”
    3. 在连通性测试弹窗中,输入数据源、目的端的IP地址和端口号,单击“测试”

      连通性测试最多可一次性测试10个IP地址。您可以单击“继续添加”,添加多个IP地址,然后单击“批量测试”一次性测试多个IP地址的连通性。

      图2 连通性测试

      当显示“可用”时,表示集群间网络连通。如果网络不连通,可以配置Logstash集群路由,连通集群间的网络,具体操作请参见配置Logstash集群路由

  2. 创建用于数据同步的Logstash配置文件。

    1. 在Logstash集群的配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑配置文件。
      表1 创建配置文件

      参数

      说明

      名称

      自定义配置文件名称。

      只能包含字母、数字、中划线或下划线,且必须以字母开头。必须大于等于4个字符。

      配置文件内容

      参考下面的代码示例开发配置文件内容。

      说明:

      配置文件内容大小不能超过100k。

      隐藏内容列表

      配置隐藏字符串列表。

      本案例不用配置。

      描述

      添加配置文件的描述信息,方便识别。

      input {
          jdbc {
              # JDBC驱动包路径
              jdbc_driver_library => "/opt/logstash/extend/jars/mariadb-java-client-2.7.0.jar"
              jdbc_driver_class => "org.mariadb.jdbc.Driver"
              jdbc_connection_string => "jdbc:mariadb://xxx.xxx.xxx.xxx:xxx/database_name"
              jdbc_user => "xxx"
              jdbc_password => "xxx"
              # 以下保持默认即可
              jdbc_paging_enabled => "true"
              jdbc_page_size => "50000"
              # SQL查询语句,用于确定同步的数据范围
              statement => "select * from table_name"
              # 定时任务,每5分钟同步一次,可以自定义。
              schedule => "*/5 * * * *"
          }
      }
      
      filter {
          # 移除一些Logstash事件添加的元数据字段
          mutate {
              remove_field => ["@timestamp", "@version"]
          }
      }
      
      output {
          elasticsearch {
              # 目的端集群的节点访问地址,无需添加协议
              hosts => ["xxx.xxx.xxx.xxx:9200", "xxx.xxx.xxx.xxx:9200"]
              # 事件写入的索引名称
              index => "rds_doctor_index"
              # 索引中的文档id,建议和MySQL中表的主键名称保持一致
              document_id => "%{primary_id}"
              # 以下保持默认即可
              manage_template => false              # 不管理索引模板 
              ilm_enabled => false                  # 关闭ILM策略(CSS不支持开启)
              # 安全集群必配 (非安全集群请删除以下配置)
              # user => "xxx"           # 访问集群的用户名
              # password => "xxx"       # 用户名对应的密码
              # 目的端集群启用SSL时,则需额外配置以下信息
              # ssl => true
              # cacert => "/opt/logstash/extend/certs"       # 用于认证目的端集群的CA证书文件路径
              # ssl_certificate_verification => false        # 目的端集群是否启用安全证书认证
          }
      }
      表2 数据库迁移的配置项说明

      配置项名称

      是否必填

      说明

      input

      jdbc_driver_library

      JDBC驱动包路径。

      CSS服务的Logstash集群已预置MariaDB驱动和MySQL驱动。

      取值格式:<默认证书路径>jars/<驱动名称>(如/opt/logstash/extend/jars/mariadb-java-client-2.7.0.jar)

      • 默认证书路径获取方式请参见查看默认证书
      • 驱动名称的取值范围:
        • MariaDB驱动:“mariadb-java-client-2.7.0.jar”“mariadb-java-client-2.4.0.jar”
        • MySQL驱动(仅镜像版本不低于x.x.x_26.1.0_xxx的Logstash集群支持):“mysql-connector-j-9.4.0.jar”“mysql-connector-java-8.0.16.jar”“mysql-connector-java-8.0.30.jar”

      jdbc_driver_class

      驱动类名。

      • 如果使用的是MariaDB驱动,则驱动类名配置为“org.mariadb.jdbc.Driver”
      • 如果使用的是MySQL驱动,则驱动类名配置为“com.mysql.cj.jdbc.Driver”

      jdbc_connection_string

      数据库连接字符串。

      • 如果使用的是MariaDB驱动,则取值格式为“jdbc:mariadb://<数据库访问地址>:<监听端口>/<数据库名称>”
      • 如果使用的是MySQL驱动,则取值格式为“jdbc:mysql://<数据库访问地址>:<监听端口>/<数据库名称>”

      jdbc_user

      访问数据库的用户名。

      jdbc_password

      用户名对应的密码。

      jdbc_paging_enabled

      是否启用数据库端分页功能。

      取值范围:

      • true:开启分页。系统将自动修改SQL语句以执行分页查询。
      • false(默认值):关闭分页。一次性检索所有匹配数据,存在OOM风险。

      jdbc_page_size

      每一页检索的记录行数。该参数决定了单次数据库请求的数据量。

      statement

      SQL查询语句,用于确定同步的数据范围。

      schedule

      定时任务,支持自定义同步周期。

      output

      hosts

      目的端集群的节点访问地址。支持配置多个IP地址。

      取值格式:["<节点IP地址1>:<端口号>", "<节点IP地址2>:<端口号>"]

      index

      事件写入的索引名称,即指定数据写入到哪个索引。

      • 单个索引:直接输入单个索引名称(如“my_index”)。
      • 多个索引:基于字段的动态命名或多个条件输出块来匹配多个索引。

      document_id

      事件写入的文档ID。建议和MySQL的记录ID(例如表的主键名称primary_id)保持一致。

      manage_template

      是否管理索引模板。

      取值范围:

      • true:自动管理并推送索引模板到目的端。
      • false(默认值):不管理索引模板。

      ilm_enabled

      是否启用ILM策略。

      CSS服务不支持启用ILM。保持固定值false。

      user

      访问目的端集群的用户名。

      安全模式的集群必填。

      password

      访问目的端集群的用户名对应的密码。

      安全模式的集群必填。

      ssl

      目的端集群是否启用SSL。

      取值范围:

      • true:使用HTTPS协议传输数据。
      • false:使用HTTP协议传输数据。

      ssl_certificate_verification

      目的端集群是否启用安全证书认证。

      取值范围:

      • true(默认值):使用安全证书认证目的端集群。
      • false:忽略安全证书认证。

      cacert

      用于认证目的端集群的CA证书文件路径。

      取值格式:<证书路径><证书名称>(如/opt/logstash/extend/certs)

      • 当目的端集群是CSS服务的Elasticsearch或OpenSearch时,获取默认CA证书的“证书名称”“证书路径”。操作指导请参见查看默认证书
      • 当目的端集群是自建或第三方Elasticsearch或OpenSearch时,则将目的端集群的安全证书上传至Logstash并获取“证书名称”“证书路径”。操作指导请参见上传自定义证书
    2. 编辑完成后,单击“下一页”配置Logstash管道参数。本案例保持默认值即可。
    3. 配置完成后,单击“创建”

      在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。

  3. 启动Logstash配置文件。

    1. 在配置文件列表,选择需要启动的配置文件,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,勾选“是否保持常驻”开启Logstash服务保持常驻。
    3. 单击“确定”,开始执行配置文件以启动Logstash迁移任务。

      可以在管道列表看到启动的配置文件。

  4. 验证数据库和Elasticsearch集群的数据是否已同步。

    1. 云搜索服务管理控制台,选择集群管理 > Elasticsearch
    2. 在Elasticsearch集群列表,单击集群操作列的“Kibana”,登录Kibana。
    3. 在左侧导航栏选择“Dev Tools”,进入Console页面。
    4. 执行如下命令查询索引数据。
      GET rds_doctor_index/_count 
      {  
        "query": {"match_all": {}}
      }

      当返回结果中,“count”的值不为0,则表示数据同步已成功。

常见问题:MySQL驱动不兼容怎么办?

在Logstash集群启动Logstash配置文件后,在Logstash管道运行状态异常,单击“运行日志”,可在日志中看到类似以下报错信息,则表示MySQL驱动不兼容。

[2024-05-21T11:31:00,196][ERROR][logstash.inputs.jdbc     ] Java::JavaSql::SQLSyntaxErrorException: (conn=-1409730930) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '"T1" LIMIT 1' at line 1: SELECT count(*) AS "COUNT" FROM (select * from logstash_broker where updatetime+30000 > 0 order by updatetime) AS "T1" LIMIT 1

解决方案:

  1. 停止Logstash配置文件。
  2. 下载与RDS版本兼容的SQL JDBC驱动,例如“mysql-connector-java-8.0.11.tar.gz”,解压后获得“mysql-connector-java-8.0.11.jar”

    下载地址:https://downloads.mysql.com/archives/c-j/

  3. 联系技术支持,将SQL JDBC驱动jar包上传至用于同步数据的Logstash集群中。
  4. 修改Logstash配置文件内容。

    修改参数值“jdbc_driver_class”“jdbc_connection_string”。其中“xxx.xxx.xxx.xxx:port”填写数据库实际访问地址和端口号。

    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 填写为MySQL JDBC的访问地址。
    jdbc_connection_string => "jdbc:mysql://xxx.xxx.xxx.xxx:port/cms"
  5. 重新启动配置文件。

相关文档