自定义函数参数传递
操作场景
如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。
操作步骤
自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:
- 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下:
pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing"
该配置定义了如表1的map。
- FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。
- key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。
- 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。
- 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。
- 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。
- 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下:
context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table"); context.getJobParameter("driver","com.mysql.jdbc.Driver"); context.getJobParameter("user","user"); context.getJobParameter("password","password");
代码示例
以下是一个UDF示例:通过pipeline.global-job-parameters传入连接数据库需要的url、user、password等参数,获取udf_info表数据后和流数据拼接成json输出。
key |
value |
---|---|
class |
class-4 |
SimpleJsonBuild.java
package udf; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap; import java.util.Map; public class SimpleJsonBuild extends ScalarFunction { private static final Logger LOG = LoggerFactory.getLogger(SimpleJsonBuild.class); String remainedKey; String remainedValue; private Connection initConnection(Map<String, String> userParasMap) { String url = userParasMap.get("url"); String driver = userParasMap.get("driver"); String user = userParasMap.get("user"); String password = userParasMap.get("password"); Connection conn = null; try { Class.forName(driver); conn = DriverManager.getConnection(url, user, password); LOG.info("connect successfully"); } catch (Exception e) { LOG.error(String.valueOf(e)); } return conn; } @Override public void open(FunctionContext context) throws Exception { Map<String, String> userParasMap = new HashMap<>(); Connection connection; PreparedStatement pstmt; ResultSet rs; String url = context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table"); String driver = context.getJobParameter("driver","com.mysql.jdbc.Driver"); String user = context.getJobParameter("user","user"); String password = context.getJobParameter("password","password"); userParasMap.put("url", url); userParasMap.put("driver", driver); userParasMap.put("user", user); userParasMap.put("password", password); connection = initConnection(userParasMap); String sql = "select `key`, `value` from udf_info"; pstmt = connection.prepareStatement(sql); rs = pstmt.executeQuery(); while (rs.next()) { remainedKey = rs.getString(1); remainedValue = rs.getString(2); } } public String eval(String... params) throws IOException { if (params != null && params.length != 0 && params.length % 2 <= 0) { HashMap<String, String> hashMap = new HashMap(); for (int i = 0; i < params.length; i += 2) { hashMap.put(params[i], params[i + 1]); LOG.debug("now the key is " + params[i].toString() + "; now the value is " + params[i + 1].toString()); } hashMap.put(remainedKey, remainedValue); ObjectMapper mapper = new ObjectMapper(); String result = "{}"; try { result = mapper.writeValueAsString(hashMap); } catch (Exception ex) { LOG.error("Get result failed." + ex.getMessage()); } LOG.debug(result); return result; } else { return "{}"; } } public static void main(String[] args) throws IOException { SimpleJsonBuild sjb = new SimpleJsonBuild(); System.out.println(sjb.eval("json1", "json2", "json3", "json4")); } }
在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters
pipeline.global-job-parameters=url:'jdbc:mysql://x.x.x.x:xxxx/swqtest',driver:com.mysql.jdbc.Driver,user:xxx,password:xxx
Flink OpenSource SQL
create function SimpleJsonBuild AS 'udf.SimpleJsonBuild'; create table dataGenSource(user_id string, amount int) with ( 'connector' = 'datagen', 'rows-per-second' = '1', --每秒生成一条数据 'fields.user_id.kind' = 'random', --为字段user_id指定random生成器 'fields.user_id.length' = '3' --限制user_id长度为3 ); create table printSink(message STRING) with ('connector' = 'print'); insert into printSink SELECT SimpleJsonBuild("name", user_id, "age", cast(amount as string)) from dataGenSource;
运行结果
单击Flink作业操作列下的“更多 > FlinkUI > Task Managers > Stdout”查看输出结果: