FlinkServer作业对接MongoDB
操作场景
本章节提供了如何使用FlinkServer写FlinkSQL对接MongoDB。支持对接source表、sink表和维表。
本章节仅适用于MRS 3.6.0-LTS及之后的版本。
前提条件
- 集群已安装HDFS、Yarn、Flink和Kafka服务。
- 已安装和MRS集群环境网络相通的MongoDB集群环境。
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser。
MongoDB作为Source表和Sink表
- 登录mongo客户端创建source表“users_source”。
use my_db
db.users_source.insertOne({ _id: "1", name: "bob", age: 11, status: true })
db.users_source.insertOne({ _id: "2", name: "george", age: 8, status: true })
db.users_source.insertOne({ _id: "3", name: "paige", age: 5, status: true })
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考创建FlinkServer作业,新建Flink SQL批作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
CREATE TABLE MyUserSource ( _id STRING, name STRING, age INT, status BOOLEAN, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口', 'database' = 'my_db', 'collection' = 'users_source' ); CREATE TABLE MyUserSink ( _id STRING, name STRING, age INT, status BOOLEAN, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口', 'database' = 'my_db', 'collection' = 'users_sink' ); INSERT INTO MyUserSink SELECT * FROM MyUserSource;
- 查看作业管理界面,直到作业状态为“运行成功”。
- 登录mongo客户端执行以下命令查看Sink表是否接收到数据。
use my_db
db.users_sink.find()
结果如下表示接收数据成功:

MongoDB作为维表
- 登录mongo客户端创建维度表"users_dim"。
use my_db
db.users_dim.insertOne({ _id: "1", name: "bob", age: 11, status: true })
db.users_dim.insertOne({ _id: "2", name: "george", age: 8, status: true })
db.users_dim.insertOne({ _id: "3", name: "paige", age: 5, status: true })
- 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
CREATE TABLE UserDim ( _id STRING, name STRING, age INT, status BOOLEAN, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口', 'database' = 'my_db', 'collection' = 'users_dim' ); CREATE TABLE Message ( name STRING, message STRING, proctime as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'user_message', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); CREATE TABLE MongoSink ( name STRING, age INT, status BOOLEAN, message STRING ) WITH ( 'connector' = 'mongodb', 'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口', 'database' = 'my_db', 'collection' = 'sink' ); INSERT INTO MongoSink SELECT t.name, dim.age, dim.status, t.message FROM Message t JOIN UserDim FOR SYSTEM_TIME AS OF t.proctime as dim ON t.name = dim.name;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为“true”,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为“true”,保存配置即可。
- Kafka Broker实例IP地址及端口号说明:
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,向Kafka中写入数据。
sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为user_message:sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic user_message --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容,输入完成后按回车发送消息:
bob,message1 george,message2 george,message2
- 登录mongo客户端执行以下命令查看Sink表是否接收到数据
use my_db
db.sink.find()
结果如下表示接收数据成功:
