- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- CSS服务权限管理
-
使用Elasticsearch搜索数据
- Elasticsearch使用流程
- Elasticsearch集群规划建议
- 创建Elasticsearch集群
- 访问Elasticsearch集群
- 导入数据至Elasticsearch集群
- 使用Elasticsearch集群搜索数据
- 增强Elasticsearch集群搜索能力
- 配置Elasticsearch集群网络
- 备份与恢复Elasticsearch集群数据
- 扩缩容Elasticsearch集群
- 升级Elasticsearch集群版本
- 管理Elasticsearch集群
- 管理Elasticsearch集群索引策略
- Elasticsearch集群监控与日志管理
- 查看Elasticsearch集群审计日志
- 使用OpenSearch搜索数据
- 使用Logstash迁移数据
- CSS服务资源监控
- 最佳实践
- API参考
- SDK参考
- 场景代码示例
-
常见问题
- 产品咨询
- 计费相关
- CSS集群访问
- CSS集群迁移
-
CSS集群搜索引擎使用
- CSS服务中为什么新创建的索引分片集中分配到单节点上?
- CSS服务中Elasticsearch 7.x集群如何在index下创建type?
- CSS服务中如何配置Elasticsearch索引副本数量?
- CSS服务中Elasticsearch集群分片过多会有哪些影响?
- 如何查看CSS集群的分片数以及副本数?
- CSS服务中Elasticsearch集群的节点node.roles为i表示什么意思?
- CSS服务中如何设置Elasticsearch集群的默认分页返回最大条数?
- CSS服务中如何更新Elasticsearch生命周期策略?
- CSS服务中如何设置Elasticsearch集群慢查询日志的阈值?
- CSS服务中如何清理Elasticsearch索引数据?
- CSS服务中如何清理Elasticsearch缓存?
- 使用delete_by_query命令删除Elasticsearch集群数据后,为什么磁盘使用率反而增加?
- CSS服务的Elasticsearch集群是否支持script dotProduct?
-
CSS集群管理
- 如何查看CSS集群所分布的可用区?
- CSS服务中Filebeat版本与集群版本的关系是什么?
- 如何获取CSS服务的安全证书?
- CSS服务中如何转换CER安全证书的格式?
- CSS服务中Elasticsearch和OpenSearch集群支持修改安全组吗?
- CSS服务中Elasticsearch集群如何设置search.max_buckets参数?
- CSS服务中如何修改Elasticsearch和OpenSearch集群的TLS算法?
- CSS服务中如何开启Elasticsearch和OpenSearch集群的安全审计日志?
- CSS服务中是否支持停止集群?
- CSS集群冻结索引后如何查询OBS上的索引占用量?
- 如何查看Elasticsearch和OpenSearch集群的系统默认插件列表
- CSS集群备份与恢复
- CSS集群监控与运维
-
故障排除
-
访问集群类
- 无法正常打开Kibana
- Elasticsearch针对filebeat配置调优
- Spring Boot使用Elasticsearch出现Connection reset by peer问题
- 为什么集群创建失败
- Elasticsearch集群出现写入拒绝“Bulk Reject”,如何解决?
- Elasticsearch集群创建index pattern卡住,如何解决?
- 云搜索控制台页面提示系统繁忙
- Elasticsearch集群报错:unassigned shards all indices
- es-head插件连接Elasticsearch集群报跨域错误
- 单节点集群打开Cerebro界面显示告警
- ECS无法连接到集群
- 集群不可用
- 数据导入导出类
-
功能使用类
- 无法备份索引
- 无法使用自定义词库功能
- 快照仓库找不到
- 集群一直处于快照中
- 数据量很大,如何进行快照备份?
- 集群突现load高的故障排查
- 使用ElasticSearch的HLRC(High Level Rest Client)时,报出I/O Reactor STOPPED
- Elasticsearch集群最大堆内存持续过高(超过90%)
- Elasticsearch集群更改规格失败
- 安全集群索引只读状态修改报错
- Elasticsearch集群某一节点分配不到shard
- 集群索引插入数据失败
- CSS创建索引报错“maximum shards open”
- 删除索引报错“403 Forbidden”是什么原因?
- Kibana中删除index pattern报错Forbidden
- 执行命令update-by-query报错“Trying to create too many scroll contexts”
- Elasticsearch集群无法创建pattern
- 端口访问类
-
访问集群类
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
使用Logstash将RDS MySQL数据同步至Elasticsearch
应用场景
- 数据实时更新与同步:将RDS MySQL中的数据实时同步到Elasticsearch,以便利用Elasticsearch强大的搜索和分析能力。
- 日志分析与检索:将MySQL中的日志数据同步到Elasticsearch,进行快速检索和分析。
- 应用性能监控:将应用性能数据存储在MySQL中,通过Logstash同步到Elasticsearch,进行实时监控和性能分析。
- 数据备份与恢复:通过Logstash将MySQL数据备份到Elasticsearch,以便在数据丢失或损坏时快速恢复。
方案架构
使用Logstash将RDS MySQL数据同步至Elasticsearch的方案流程如图1所示。
在CSS服务的Logstash中通过默认插件logstash-input-jdbc,在Logstash配置文件中配置数据库JDBC输入和Elasticsearch输出,将全量或增量RDS MySQL数据实时同步至Elasticsearch。
方案优势
- 灵活性:Logstash提供了数据采集、转换、优化和输出的能力,可以灵活地处理各种数据同步需求。
- 实时性:Logstash可以实现数据的准实时同步,满足大多数业务场景的需求。
- 易用性:通过Logstash配置文件即可实现数据同步,操作简单,无需复杂的代码开发。
约束限制
- Elasticsearch中的_id字段必须与MySQL中的id字段相同。
这是为了确保当MySQL中的记录写入Elasticsearch时,同步任务可在MySQL记录与Elasticsearch索引之间建立一个直接映射的关系。例如,当MySQL中更新了某条记录时,同步任务会覆盖Elasticsearch中与更新记录具有相同ID的索引。
- 当MySQL中插入或者更新数据时,对应记录必须有一个包含更新或插入时间的字段。
Logstash在每次轮询MySQL时,会记录最后读取记录的时间戳,并在下一次读取时只获取该时间戳之后更新或插入的符合条件的记录。
- 确保MySQL数据库、Logstash集群和Elasticsearch集群在同一时区,否则当同步与时间相关的数据时,同步前后的数据可能存在时区差。
前提条件
- 已准备好存有数据的MySQL数据库,本案例以云数据库的RDS for MySQL实例为例,具体操作请参见购买RDS for MySQL实例。
- 已准备好用于同步数据的Logstash集群,具体操作请参见创建Logstash集群。本文以7.10.0版本的Logstash集群为例。
- 已准备好Elasticsearch集群,具体操作请参见创建Elasticsearch集群。本文以7.10.2版本的Elasticsearch集群为例。
以上三者在同一VPC下面。
- 是,则可以直接开始配置数据同步。
- 否,则需要参考常见问题:MySQL驱动不兼容怎么办?上传与RDS版本兼容的SQL JDBC驱动到Logstash集群中。
操作步骤
- 验证Logstash集群和数据源之间的网络连通性。
- 登录云搜索服务管理控制台。
- 左侧导航栏选择“集群管理 > Logstash”,进入集群列表页面。
- 在集群列表,单击集群操作列的“配置中心”,进入配置中心页面。
或者,在集群列表,单击集群名称,进入集群基本信息页面,在左侧导航栏选择“配置中心”,进入配置中心页面。
- 在配置中心页面,单击“连通性测试”。
- 在连通性测试弹窗中,输入数据源、目的端的IP地址和端口号,单击“测试”。
连通性测试最多可一次性测试10个IP地址。您可以单击“继续添加”,添加多个IP地址,然后单击“批量测试”,进行一次性测试多个IP地址的连通性。
图2 连通性测试当显示“可用”时,表示集群间网络连通。如果网络不连通,可以配置Logstash集群路由,连通集群间的网络,具体操作请参见配置Logstash集群路由。
- 创建用于数据同步的Logstash配置文件。
- 在Logstash集群的配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑配置文件。
表1 创建配置文件 参数
说明
名称
自定义配置文件名称。
只能包含字母、数字、中划线或下划线,且必须以字母开头。必须大于等于4个字符。
配置文件内容
参考下面的代码示例开发配置文件内容。
说明:
配置文件内容大小不能超过100k。
隐藏内容列表
配置隐藏字符串列表后,在返回的配置内容中,会将所有在列表中的字串隐藏为“***”。
本案例不用配置。
input { jdbc{ # JDBC驱动配置。 jdbc_driver_library => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/jars/mariadb-java-client-2.7.0.jar" jdbc_driver_class => "org.mariadb.jdbc.Driver" jdbc_connection_string => "jdbc:mariadb://xxx.xxx.xxx.xxx:port/cms?useUnicode=true&characterEncoding=utf8mb4&autoReconnect=true&allowMultiQueries=true" jdbc_user => "root" jdbc_password => "xx" # 以下保持默认即可。 jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 迁移数据的SQL查询语句。 statement => "select a.user_code AS doctor_id,a.record_status from cluster " # 定时任务,每5分钟同步一次,可以自定义。 schedule => "*/5 * * * *" } } filter { } output { elasticsearch { hosts => ["xxx.xxx.xxx.xxx:port","xxx.xxx.xxx.xxx:port","xxx.xxx.xxx.xxx:port"] # 设置索引名称。 index => "rds_doctor_index" user => "admin" password => "xx" # 索引中的文档id,建议和MySQL中表的主键名称保持一致。 document_id => "%{primary_id}" # 目标Elasticsearch集群启用HTTPS访问时,才需要配置证书。 ssl => true ssl_certificate_verification => false cacert => "/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/cert/CloudSearchService.cer" # 以下保持默认即可。 manage_template => false ilm_enabled => false } }
表2 配置项说明 配置项名称
是否必填
说明
input
jdbc_driver_library
是
JDBC驱动程序库路径。
- 当数据库驱动是MariaDB驱动时,该值填写“/rds/datastore/logstash/v7.10.0/package/logstash-7.10.0/extend/jars/mariadb-java-client-2.7.0.jar”。
- 当数据库驱动是RDS版本兼容的SQL JDBC驱动时,该值需要联系技术支持修改。
JDBC驱动相关的详细参数配置请参见Jdbc input plugin。
jdbc_driver_class
是
驱动程序库的class路径。
- 当数据库驱动是MariaDB驱动时,该值填写“org.mariadb.jdbc.Driver”。
- 当数据库驱动是RDS版本兼容的SQL JDBC驱动时,该值填写“com.mysql.jdbc.Driver”。
jdbc_connection_string
是
MySQL JDBC的访问地址。
- 当数据库驱动是MariaDB驱动时,该值填写“jdbc:mariadb://xxx.xxx.xxx.xxx:port/cms?useUnicode=true&characterEncoding=utf8mb4&autoReconnect=true&allowMultiQueries=true”。
- 当数据库驱动是RDS版本兼容的SQL JDBC驱动时,该值填写“jdbc:mysql://xxx.xxx.xxx.xxx:port/cms”。
其中“xxx.xxx.xxx.xxx:port”填写数据库实际访问地址和端口号。
jdbc_user
是
访问MySQL JDBC的用户名。
jdbc_password
是
访问MySQL JDBC的密码。
statement
是
迁移数据的SQL查询语句。
schedule
是
定时任务,支持自定义同步周期。
output
hosts
是
Elasticsearch集群的访问地址。
index
是
设置索引名称,即数据导入到哪个索引。
user
否
访问Elasticsearch集群的用户名,仅安全集群涉及。
password
否
访问Elasticsearch集群的密码,仅安全集群涉及。
document_id
是
索引中的文档ID,建议和MySQL的记录ID(例如表的主键名称primary_id)保持一致。
ssl
否
是否开启HTTPS通信。
当Elasticsearch集群启用HTTPS访问时,该值设置为“true”,否则不用配置。
ssl_certificate_verification
否
是否验证服务端Elasticsearch证书。仅当“ssl”配置为“true”时,才需要配置该参数。
- true:验证证书。
- false:忽略证书。
cacert
否
HTTPS访问证书,CSS集群保持默认值。
- 编辑完成后,单击“下一页”配置Logstash管道参数。本案例保持默认值即可。
- 配置完成后,单击“创建”。
在配置中心页面可以看到创建的配置文件,状态为“可用”,表示创建成功。
- 在Logstash集群的配置中心页面,单击右上角“创建”,进入创建配置文件页面,编辑配置文件。
- 启动Logstash配置文件。
- 在配置文件列表,选择需要启动的配置文件,单击左上角的“启动”。
- 在“启动Logstash服务”对话框中,勾选“是否保持常驻”开启Logstash服务保持常驻。
- 单击“确定”,开始启动配置文件启动Logstash迁移任务。
可以在管道列表看到启动的配置文件。
- 验证数据库和Elasticsearch集群的数据是否已同步。
- 在云搜索服务管理控制台,选择“集群管理 > Elasticsearch”。
- 在Elasticsearch集群列表,单击集群操作列的“Kibana”,登录Kibana。
- 在左侧导航栏选择“Dev Tools”,进入Console页面。
- 执行如下命令查询索引数据。
GET rds_doctor_index/_count { "query": {"match_all": {}} }
当返回结果中,“count”的值不为0,则表示数据同步已成功。
常见问题:MySQL驱动不兼容怎么办?
在Logstash集群启动Logstash配置文件后,在Logstash管道运行状态异常,单击“运行日志”,可以日志中看到类似以下报错信息,则表示MySQL驱动不兼容。
[2024-05-21T11:31:00,196][ERROR][logstash.inputs.jdbc ] Java::JavaSql::SQLSyntaxErrorException: (conn=-1409730930) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '"T1" LIMIT 1' at line 1: SELECT count(*) AS "COUNT" FROM (select * from logstash_broker where updatetime+30000 > 0 order by updatetime) AS "T1" LIMIT 1
解决方案:
- 停止Logstash配置文件。
- 下载与RDS版本兼容的SQL JDBC驱动,例如“mysql-connector-java-8.0.11.tar.gz”,解压后获得“mysql-connector-java-8.0.11.jar”。
- 联系技术支持,将SQL JDBC驱动jar包上传至用于同步数据的Logstash集群中。
- 修改Logstash配置文件内容。
修改参数值“jdbc_driver_class”和“jdbc_connection_string”。其中“xxx.xxx.xxx.xxx:port”填写数据库实际访问地址和端口号。
jdbc_driver_class => "com.mysql.jdbc.Driver" # 填写为MySQL JDBC的访问地址。 jdbc_connection_string => "jdbc:mysql://xxx.xxx.xxx.xxx:port/cms"
- 重新启动配置文件。