文档首页/ 云搜索服务 CSS/ 最佳实践/ 使用Logstash同步数据至Elasticsearch/ 使用Logstash将RDS MySQL数据同步至Elasticsearch
更新时间:2025-08-29 GMT+08:00
分享

使用Logstash将RDS MySQL数据同步至Elasticsearch

应用场景

CSS服务的Logstash集群默认安装了logstash-input-jdbc插件,该插件为Logstash提供了从关系型数据库RDS MySQL中导入和处理数据的能力,通过配置Logstash配置文件,定义JDBC输入和Elasticsearch输出,实现定期同步数据库中的数据到Elasticsearch。该方案可以用于以下场景:
  • 数据实时更新与同步:将RDS MySQL中的数据实时同步到Elasticsearch,以便利用Elasticsearch强大的搜索和分析能力。
  • 日志分析与检索:将MySQL中的日志数据同步到Elasticsearch,进行快速检索和分析。
  • 应用性能监控:将应用性能数据存储在MySQL中,通过Logstash同步到Elasticsearch,进行实时监控和性能分析。
  • 数据备份与恢复:通过Logstash将MySQL数据备份到Elasticsearch,以便在数据丢失或损坏时快速恢复。

方案架构

图1 RDS MySQL数据同步至Elasticsearch

使用Logstash将RDS MySQL数据同步至Elasticsearch的方案流程如图1所示。

在CSS服务的Logstash中通过默认插件logstash-input-jdbc,在Logstash配置文件中配置数据库JDBC输入和Elasticsearch输出,将全量或增量RDS MySQL数据实时同步至Elasticsearch。

方案优势

  • 灵活性:Logstash提供了数据采集、转换、优化和输出的能力,可以灵活地处理各种数据同步需求。
  • 实时性:Logstash可以实现数据的准实时同步,满足大多数业务场景的需求。
  • 易用性:通过Logstash配置文件即可实现数据同步,操作简单,无需复杂的代码开发。

约束限制

  • Elasticsearch中的_id字段必须与MySQL中的id字段相同。

    这是为了确保当MySQL中的记录写入Elasticsearch时,同步任务可在MySQL记录与Elasticsearch索引之间建立一个直接映射的关系。例如,当MySQL中更新了某条记录时,同步任务会覆盖Elasticsearch中与更新记录具有相同ID的索引。

  • 当MySQL中插入或者更新数据时,对应记录必须有一个包含更新或插入时间的字段。

    Logstash在每次轮询MySQL时,会记录最后读取记录的时间戳,并在下一次读取时只获取该时间戳之后更新或插入的符合条件的记录。

  • 确保MySQL数据库、Logstash集群和Elasticsearch集群在同一时区,否则当同步与时间相关的数据时,同步前后的数据可能存在时区差。

前提条件

  • 已准备好存有数据的MySQL数据库,本案例以云数据库的RDS for MySQL实例为例,具体操作请参见购买RDS for MySQL实例
  • 已准备好用于同步数据的Logstash集群,具体操作请参见创建Logstash集群。本文以7.10.0版本的Logstash集群为例。
  • 已准备好Elasticsearch集群,具体操作请参见创建Elasticsearch集群。本文以7.10.2版本的Elasticsearch集群为例。

以上三者在同一VPC下面。

当使用的是自建或第三方MySQL数据库时,则需要确认数据库驱动是否是MariaDB驱动。

操作步骤

  1. 验证Logstash集群和数据源之间的网络连通性。

    1. 进入配置中心页面。
      1. 登录云搜索服务管理控制台
      2. 在左侧导航栏,选择“集群管理 > Logstash”
      3. 在集群列表,单击目标集群名称,进入集群详情页。
      4. 选择“配置中心”页签。
    2. 在配置中心页面,单击“连通性测试”
    3. 在连通性测试弹窗中,输入数据源、目的端的IP地址和端口号,单击“测试”

      连通性测试最多可一次性测试10个IP地址。您可以单击“继续添加”,添加多个IP地址,然后单击“批量测试”,进行一次性测试多个IP地址的连通性。

      图2 连通性测试

      当显示“可用”时,表示集群间网络连通。如果网络不连通,可以配置Logstash集群路由,连通集群间的网络,具体操作请参见配置Logstash集群路由

  2. 创建用于数据同步的Logstash配置文件。

    1. 在Logstash集群的配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑配置文件。
      表1 创建配置文件

      参数

      说明

      名称

      自定义配置文件名称。

      只能包含字母、数字、中划线或下划线,且必须以字母开头。必须大于等于4个字符。

      配置文件内容

      参考下面的代码示例开发配置文件内容。

      说明:

      配置文件内容大小不能超过100k。

      隐藏内容列表

      配置隐藏字符串列表后,在返回的配置内容中,会将所有在列表中的字串隐藏为“***”

      本案例不用配置。

      input {
        jdbc{
          # JDBC驱动配置。
          jdbc_driver_library => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/jars/mariadb-java-client-2.7.0.jar"
          jdbc_driver_class => "org.mariadb.jdbc.Driver"
          jdbc_connection_string => "jdbc:mariadb://xxx.xxx.xxx.xxx:port/cms?useUnicode=true&characterEncoding=utf8mb4&autoReconnect=true&allowMultiQueries=true"
          jdbc_user => "root"
          jdbc_password => "xx"
          # 以下保持默认即可。
          jdbc_paging_enabled => "true"
          jdbc_page_size => "50000"
          # 迁移数据的SQL查询语句。
          statement => "select a.user_code AS doctor_id,a.record_status from cluster "
          # 定时任务,每5分钟同步一次,可以自定义。
          schedule => "*/5 * * * *"
        }
      }
      filter {
      
      }
      output {
        elasticsearch {
          hosts => ["xxx.xxx.xxx.xxx:port","xxx.xxx.xxx.xxx:port","xxx.xxx.xxx.xxx:port"]
          # 设置索引名称。
          index => "rds_doctor_index"
          user => "admin"
          password => "xx"
          # 索引中的文档id,建议和MySQL中表的主键名称保持一致。
          document_id => "%{primary_id}"
          # 目标Elasticsearch集群启用HTTPS访问时,才需要配置证书。
          ssl => true
          ssl_certificate_verification => false
          cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/certs"
          # 以下保持默认即可。
          manage_template => false
          ilm_enabled => false
        }
      }
      表2 配置项说明

      配置项名称

      是否必填

      说明

      input

      jdbc_driver_library

      JDBC驱动程序库路径。

      • 当数据库驱动是MariaDB驱动时,该值填写“/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/jars/mariadb-java-client-2.7.0.jar”
      • 当数据库驱动是RDS版本兼容的SQL JDBC驱动时,该值需要联系技术支持修改。

      JDBC驱动相关的详细参数配置请参见Jdbc input plugin

      jdbc_driver_class

      驱动程序库的class路径。

      • 当数据库驱动是MariaDB驱动时,该值填写“org.mariadb.jdbc.Driver”
      • 当数据库驱动是RDS版本兼容的SQL JDBC驱动时,该值填写“com.mysql.jdbc.Driver”

      jdbc_connection_string

      MySQL JDBC的访问地址。

      • 当数据库驱动是MariaDB驱动时,该值填写“jdbc:mariadb://xxx.xxx.xxx.xxx:port/cms?useUnicode=true&characterEncoding=utf8mb4&autoReconnect=true&allowMultiQueries=true”
      • 当数据库驱动是RDS版本兼容的SQL JDBC驱动时,该值填写“jdbc:mysql://xxx.xxx.xxx.xxx:port/cms”

      其中“xxx.xxx.xxx.xxx:port”填写数据库实际访问地址和端口号。

      jdbc_user

      访问MySQL JDBC的用户名。

      jdbc_password

      访问MySQL JDBC的密码。

      statement

      迁移数据的SQL查询语句。

      schedule

      定时任务,支持自定义同步周期。

      output

      hosts

      Elasticsearch集群的访问地址。

      index

      设置索引名称,即数据导入到哪个索引。

      user

      访问Elasticsearch集群的用户名,仅安全集群涉及。

      password

      访问Elasticsearch集群的密码,仅安全集群涉及。

      document_id

      索引中的文档ID,建议和MySQL的记录ID(例如表的主键名称primary_id)保持一致。

      ssl

      是否开启HTTPS通信。

      当Elasticsearch集群启用HTTPS访问时,该值设置为“true”,否则不用配置。

      ssl_certificate_verification

      是否验证服务端Elasticsearch证书。仅当“ssl”配置为“true”时,才需要配置该参数。

      • true:验证证书。
      • false:忽略证书。

      cacert

      HTTPS访问证书,CSS集群保持默认值。

    2. 编辑完成后,单击“下一页”配置Logstash管道参数。本案例保持默认值即可。
    3. 配置完成后,单击“创建”

      在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。

  3. 启动Logstash配置文件。

    1. 在配置文件列表,选择需要启动的配置文件,单击左上角的“启动”。
    2. “启动Logstash服务”对话框中,勾选“是否保持常驻”开启Logstash服务保持常驻。
    3. 单击“确定”,开始启动配置文件启动Logstash迁移任务。

      可以在管道列表看到启动的配置文件。

  4. 验证数据库和Elasticsearch集群的数据是否已同步。

    1. 云搜索服务管理控制台,选择集群管理 > Elasticsearch
    2. 在Elasticsearch集群列表,单击集群操作列的“Kibana”,登录Kibana。
    3. 在左侧导航栏选择“Dev Tools”,进入Console页面。
    4. 执行如下命令查询索引数据。
      GET rds_doctor_index/_count 
      {  
        "query": {"match_all": {}}
      }

      当返回结果中,“count”的值不为0,则表示数据同步已成功。

常见问题:MySQL驱动不兼容怎么办?

在Logstash集群启动Logstash配置文件后,在Logstash管道运行状态异常,单击“运行日志”,可在日志中看到类似以下报错信息,则表示MySQL驱动不兼容。

[2024-05-21T11:31:00,196][ERROR][logstash.inputs.jdbc     ] Java::JavaSql::SQLSyntaxErrorException: (conn=-1409730930) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '"T1" LIMIT 1' at line 1: SELECT count(*) AS "COUNT" FROM (select * from logstash_broker where updatetime+30000 > 0 order by updatetime) AS "T1" LIMIT 1

解决方案:

  1. 停止Logstash配置文件。
  2. 下载与RDS版本兼容的SQL JDBC驱动,例如“mysql-connector-java-8.0.11.tar.gz”,解压后获得“mysql-connector-java-8.0.11.jar”

    下载地址:https://downloads.mysql.com/archives/c-j/

  3. 联系技术支持,将SQL JDBC驱动jar包上传至用于同步数据的Logstash集群中。
  4. 修改Logstash配置文件内容。

    修改参数值“jdbc_driver_class”“jdbc_connection_string”。其中“xxx.xxx.xxx.xxx:port”填写数据库实际访问地址和端口号。

    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 填写为MySQL JDBC的访问地址。
    jdbc_connection_string => "jdbc:mysql://xxx.xxx.xxx.xxx:port/cms"
  5. 重新启动配置文件。

相关文档