更新时间:2025-07-14 GMT+08:00

FlinkSQL JSON_VALUE函数性能优化

本章节适用于MRS 3.5.0及以后版本。

使用场景

内置JSON_VALUE函数解析一个JSON item的多个字段时,复用上次JSON item的解析结果,提升算子性能。

使用方法

配置Flink作业时,可通过在FlinkServer WebUI的Flink作业开发界面添加自定义参数“table.optimizing.json-value-optimization”为“true”开启JSON_VALUE函数性能优化,可参考创建FlinkServer作业

SQL示例:

create table kafkaSource (
    msgValue STRING
) with (
    'connector' = 'kafka'
    ,'topic' = 'topic-1'
    ,'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号'
    ,'properties.group.id' = 'group-1'
    ,'value.format' = 'raw'
    ,'scan.startup.mode' = 'group-offsets'
    ,'properties.auto.offset.reset' = 'earliest'
    ,'value.fields-include' = 'EXCEPT_KEY'
);
create table print (
    id STRING,
    name STRING
) with (
      'connector' = 'print'
);
INSERT INTO print
SELECT
    JSON_VAL(msgValue, '$.id'),
    JSON_VAL(msgValue, '$.name')
from kafkaSource;
Kafka Broker实例IP地址及端口号说明:
  • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
  • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
  • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

    登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。