自定义函数参数传递
操作场景
如果您的自定义函数需要在多个作业中使用,但对于不同作业某些参数值不同,直接在UDF中修改较为复杂。您可以在Flink OpenSource SQL编辑页面,自定义配置中配置参数pipeline.global-job-parameters,在UDF代码中获取该参数并使用。如需修改参数值,直接在FlinkOpenSource SQL编辑页面,自定义配置中修改该参数值,即可达到快速修改UDF参数值的目的。
操作步骤
自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。自定义函数的参数传递操作步骤如下:
- 在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters,格式如下:
pipeline.global-job-parameters=k1:v1,"k2:v1,v2",k3:"str:ing","k4:str""ing"
该配置定义了如表1的map。
- FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。
- key和value之间通过冒号(:)分隔,所有key-value用逗号(,)连接。
- 如果key或value中含有逗号(,),则需要用双引号(")将key:value整个包围起来。参考k2。
- 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。参考k3。
- 如果key或value中含有双引号("),则需要通过连写两个双引号("")进行转义,也需要用双引号(")将key:value整个包围起来。参考k4。
- 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容,代码示例如下:
context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table"); context.getJobParameter("driver","com.mysql.jdbc.Driver"); context.getJobParameter("user","user"); context.getJobParameter("password","password");
代码示例
以下是一个UDF示例:通过pipeline.global-job-parameters传入连接数据库需要的url、user、password等参数,获取udf_info表数据后和流数据拼接成json输出。
|
key |
value |
|---|---|
|
class |
class-4 |
SimpleJsonBuild.java
package udf;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
public class SimpleJsonBuild extends ScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(SimpleJsonBuild.class);
String remainedKey;
String remainedValue;
private Connection initConnection(Map<String, String> userParasMap) {
String url = userParasMap.get("url");
String driver = userParasMap.get("driver");
String user = userParasMap.get("user");
String password = userParasMap.get("password");
Connection conn = null;
try {
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
LOG.info("connect successfully");
} catch (Exception e) {
LOG.error(String.valueOf(e));
}
return conn;
}
@Override
public void open(FunctionContext context) throws Exception {
Map<String, String> userParasMap = new HashMap<>();
Connection connection;
PreparedStatement pstmt;
ResultSet rs;
String url = context.getJobParameter("url","jdbc:mysql://xx.xx.xx.xx:3306/table");
String driver = context.getJobParameter("driver","com.mysql.jdbc.Driver");
String user = context.getJobParameter("user","user");
String password = context.getJobParameter("password","password");
userParasMap.put("url", url);
userParasMap.put("driver", driver);
userParasMap.put("user", user);
userParasMap.put("password", password);
connection = initConnection(userParasMap);
String sql = "select `key`, `value` from udf_info";
pstmt = connection.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
remainedKey = rs.getString(1);
remainedValue = rs.getString(2);
}
}
public String eval(String... params) throws IOException {
if (params != null && params.length != 0 && params.length % 2 <= 0) {
HashMap<String, String> hashMap = new HashMap();
for (int i = 0; i < params.length; i += 2) {
hashMap.put(params[i], params[i + 1]);
LOG.debug("now the key is " + params[i].toString() + "; now the value is " + params[i + 1].toString());
}
hashMap.put(remainedKey, remainedValue);
ObjectMapper mapper = new ObjectMapper();
String result = "{}";
try {
result = mapper.writeValueAsString(hashMap);
} catch (Exception ex) {
LOG.error("Get result failed." + ex.getMessage());
}
LOG.debug(result);
return result;
} else {
return "{}";
}
}
public static void main(String[] args) throws IOException {
SimpleJsonBuild sjb = new SimpleJsonBuild();
System.out.println(sjb.eval("json1", "json2", "json3", "json4"));
}
}
在Flink OpenSource SQL编辑页面右侧自定义配置中添加参数pipeline.global-job-parameters
pipeline.global-job-parameters=url:'jdbc:mysql://x.x.x.x:xxxx/swqtest',driver:com.mysql.jdbc.Driver,user:xxx,password:xxx
Flink OpenSource SQL
create function SimpleJsonBuild AS 'udf.SimpleJsonBuild';
create table dataGenSource(user_id string, amount int) with (
'connector' = 'datagen',
'rows-per-second' = '1', --每秒生成一条数据
'fields.user_id.kind' = 'random', --为字段user_id指定random生成器
'fields.user_id.length' = '3' --限制user_id长度为3
);
create table printSink(message STRING) with ('connector' = 'print');
insert into
printSink
SELECT
SimpleJsonBuild("name", user_id, "age", cast(amount as string))
from
dataGenSource;
运行结果
单击Flink作业操作列下的“更多 > FlinkUI > Task Managers > Stdout”查看输出结果:
