更新时间:2024-03-12 GMT+08:00

自定义函数参数传递

操作场景

如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。

操作步骤

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:

  1. 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下:
    pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing"

    该配置定义了如表1的map。

    表1 pipeline.global-job-parameters示例

    key

    value

    k1

    v1

    k2

    v1,v2

    k3

    str:ing

    k4

    str""ing

    • FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。
    • key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。
    • 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。
    • 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。
    • 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。
  2. 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下:
    context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table");
    context.getJobParameter("driver","com.mysql.jdbc.Driver");
    context.getJobParameter("user","user");
    context.getJobParameter("password","password");

代码示例

以下是一个UDF示例:通过pipeline.global-job-parameters传入连接数据库需要的url、user、password等参数,获取udf_info表数据后和流数据拼接成json输出。

表2 udf_info表

key

value

class

class-4

SimpleJsonBuild.java

package udf;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;

public class SimpleJsonBuild extends ScalarFunction {
   private static final Logger LOG = LoggerFactory.getLogger(SimpleJsonBuild.class);
   String remainedKey;
   String remainedValue;

   private Connection initConnection(Map<String, String> userParasMap) {
       String url = userParasMap.get("url");
       String driver = userParasMap.get("driver");
       String user = userParasMap.get("user");
       String password = userParasMap.get("password");
       Connection conn = null;
       try {
           Class.forName(driver);
           conn = DriverManager.getConnection(url, user, password);
           LOG.info("connect successfully");
      } catch (Exception e) {
           LOG.error(String.valueOf(e));
      }
       return conn;
  }

   @Override
   public void open(FunctionContext context) throws Exception {
       Map<String, String> userParasMap = new HashMap<>();
       Connection connection;
       PreparedStatement pstmt;
       ResultSet rs;

       String url = context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table");
       String driver = context.getJobParameter("driver","com.mysql.jdbc.Driver");
       String user = context.getJobParameter("user","user");
       String password = context.getJobParameter("password","password");

       userParasMap.put("url", url);
       userParasMap.put("driver", driver);
       userParasMap.put("user", user);
       userParasMap.put("password", password);

       connection = initConnection(userParasMap);
       String sql = "select `key`, `value` from udf_info";
       pstmt = connection.prepareStatement(sql);
       rs = pstmt.executeQuery();

       while (rs.next()) {
           remainedKey = rs.getString(1);
           remainedValue = rs.getString(2);
      }
  }

   public String eval(String... params) throws IOException {
       if (params != null && params.length != 0 && params.length % 2 <= 0) {
           HashMap<String, String> hashMap = new HashMap();
           for (int i = 0; i < params.length; i += 2) {
               hashMap.put(params[i], params[i + 1]);
               LOG.debug("now the key is " + params[i].toString() + "; now the value is " + params[i + 1].toString());
          }
           hashMap.put(remainedKey, remainedValue);
           ObjectMapper mapper = new ObjectMapper();
           String result = "{}";
           try {
               result = mapper.writeValueAsString(hashMap);
          } catch (Exception ex) {
               LOG.error("Get result failed." + ex.getMessage());
          }
           LOG.debug(result);
           return result;
      } else {
           return "{}";
      }
  }

   public static void main(String[] args) throws IOException {
       SimpleJsonBuild  sjb = new SimpleJsonBuild();
       System.out.println(sjb.eval("json1", "json2", "json3", "json4"));
  }
}

在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters

pipeline.global-job-parameters=url:'jdbc:mysql://x.x.x.x:xxxx/swqtest',driver:com.mysql.jdbc.Driver,user:xxx,password:xxx

Flink OpenSource SQL

create function SimpleJsonBuild AS 'udf.SimpleJsonBuild';
create table dataGenSource(user_id string, amount int) with (
 'connector' = 'datagen',
 'rows-per-second' = '1', --每秒生成一条数据
 'fields.user_id.kind' = 'random', --为字段user_id指定random生成器
 'fields.user_id.length' = '3' --限制user_id长度为3
);
create table printSink(message STRING) with ('connector' = 'print');
insert into
printSink
SELECT
SimpleJsonBuild("name", user_id, "age", cast(amount as string))
from
dataGenSource;

运行结果

单击Flink作业操作列下的“更多 > FlinkUI > Task Managers > Stdout”查看输出结果: