更新时间:2025-12-10 GMT+08:00
分享

FlinkServer作业对接HTTP服务

操作场景

本章节提供了如何使用FlinkServer写FlinkSQL对接HTTP服务。支持对接HTTP维表。

本章节仅适用于MRS 3.6.0-LTS及之后的版本。

前提条件

  • 集群中已安装HDFS、Yarn、Flink服务。
  • 已部署和MRS集群环境网络相通的HTTP服务。
  • 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flinkuser

HTTP服务作为维表

  1. 使用flinkuser登录Manager界面,选择“集群 > 服务 > Flink”,在“Flink Web UI”右侧,单击链接,访问Flink的WebUI。
  2. 参考创建FlinkServer作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。

    CREATE TABLE Customers (
      id STRING,
      id2 STRING,
      msg STRING,
      uuid STRING,
      details ROW < isActive BOOLEAN,
      nestedDetails ROW < balance STRING > >
    ) WITH (
      'connector' = 'http-lookup',
      'format' = 'json',
      'url' = 'http://http服务IP:http服务端口',
      'asyncPolling' = 'true'
    );
    CREATE TABLE Orders (
      id STRING,
      id2 STRING,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.id.kind' = 'sequence',
      'fields.id.start' = '1',
      'fields.id.end' = '120'
    );
    CREATE TABLE result_table(
      id STRING,
      id2 STRING,
      msg STRING,
      uuid STRING,
      isActive BOOLEAN,
      balance STRING
    ) WITH ('connector' = 'print');
    insert into
      result_table
    SELECT
      o.id,
      o.id2,
      c.msg,
      c.uuid,
      c.isActive,
      c.balance
    FROM
      Orders AS o
      JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id
      AND o.id2 = c.id2;

相关表参数配置说明

表1 创建HTTP维表时可选配置参数

参数名称

参数默认值

是否必须配置

参数描述

connector

-

http服务做维表,值写为“http-lookup”。

format

-

http服务返回的消息格式,例如:json。

url

-

http服务URL,例如:http://127.0.0.1:8080/client。

asyncPolling

false

true/false。是否使用异步轮询机制,该机制基于Flink的Async I/O实现。

lookup-method

GET

GET/POST/PUT。默认使用GET方法请求HTTP服务。

lookup.cache

NONE

维表的缓存策略。目前支持NONE(不缓存)和PARTIAL(查找HTTP服务时缓存)。

lookup.partial-cache.max-rows

(none) Integer

维表缓存的最大行数,若超过该值,则最早的行记录将会过期。使用该配置时,“lookup.cache”需要设置为“PARTIAL”。

lookup.partial-cache.expire-after-write

(none) Duration

在记录写入缓存后该记录的最大保留时间。使用该配置时,“lookup.cache”需要设置为“PARTIAL”。

lookup.partial-cache.expire-after-access

(none) Duration

缓存中的记录最长保留时间。使用该配置时,“lookup.cache”需要设置为“PARTIAL”。

lookup.partial-cache.cache-missing-key

true

在HTTP服务中未关联到数据时,是否缓存空记录,默认为true。使用该配置时,“lookup.cache”需要设置为“PARTIAL”。

lookup.max-retries

3

查询失败的最大重试次数。

相关文档