更新时间:2025-12-10 GMT+08:00
分享

FlinkServer作业对接MongoDB

操作场景

本章节提供了如何使用FlinkServer写FlinkSQL对接MongoDB。支持对接source表、sink表和维表。

本章节仅适用于MRS 3.6.0-LTS及之后的版本。

前提条件

  • 集群已安装HDFS、Yarn、Flink和Kafka服务。
  • 已安装和MRS集群环境网络相通的MongoDB集群环境。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser

MongoDB作为Source表和Sink表

  1. 登录mongo客户端创建source表“users_source”。

    use my_db

    db.users_source.insertOne({ _id: "1", name: "bob", age: 11, status: true })

    db.users_source.insertOne({ _id: "2", name: "george", age: 8, status: true })

    db.users_source.insertOne({ _id: "3", name: "paige", age: 5, status: true })

  2. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 参考创建FlinkServer作业,新建Flink SQL批作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TABLE MyUserSource (
      _id STRING,
      name STRING,
      age INT,
      status BOOLEAN,
      PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口',
      'database' = 'my_db',
      'collection' = 'users_source'
    );
    CREATE TABLE MyUserSink (
      _id STRING,
      name STRING,
      age INT,
      status BOOLEAN,
      PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口',
      'database' = 'my_db',
      'collection' = 'users_sink'
    );
    INSERT INTO
      MyUserSink
    SELECT
      *
    FROM
      MyUserSource;

  4. 查看作业管理界面,直到作业状态为“运行成功”。
  5. 登录mongo客户端执行以下命令查看Sink表是否接收到数据。

    use my_db

    db.users_sink.find()

    结果如下表示接收数据成功:

MongoDB作为维表

  1. 登录mongo客户端创建维度表"users_dim"。

    use my_db

    db.users_dim.insertOne({ _id: "1", name: "bob", age: 11, status: true })

    db.users_dim.insertOne({ _id: "2", name: "george", age: 8, status: true })

    db.users_dim.insertOne({ _id: "3", name: "paige", age: 5, status: true })

  2. 使用flinkuser登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 参考创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TABLE UserDim (
      _id STRING,
      name STRING,
      age INT,
      status BOOLEAN,
      PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口',
      'database' = 'my_db',
      'collection' = 'users_dim'
    );
    CREATE TABLE Message (
      name STRING,
      message STRING, 
      proctime as proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_message',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数
      'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数
      'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数
    );
    CREATE TABLE MongoSink (
      name STRING,
      age INT,
      status BOOLEAN, 
      message STRING
    ) WITH (
      'connector' = 'mongodb',
      'uri' = 'mongodb://登录用户名:登录密码@MongoDB服务IP:MongoDB服务端口',
      'database' = 'my_db',
      'collection' = 'sink'
    );
    INSERT INTO
      MongoSink
    SELECT
      t.name, 
      dim.age, 
      dim.status, 
      t.message
    FROM
      Message t
      JOIN UserDim FOR SYSTEM_TIME AS OF t.proctime as dim ON t.name = dim.name;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为“true”,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为“true”,保存配置即可。

  4. 查看作业管理界面,作业状态为“运行中”。
  5. 参考管理Kafka Topic中的消息,向Kafka中写入数据。

    sh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为user_messagesh kafka-console-producer.sh --broker-list Kafka的Broker实例所在节点的IP地址:Kafka端口号 --topic user_message --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容,输入完成后按回车发送消息:

    bob,message1
    george,message2
    george,message2

  6. 登录mongo客户端执行以下命令查看Sink表是否接收到数据

    use my_db

    db.sink.find()

    结果如下表示接收数据成功:

相关文档