Flink Jar作业提交SQL样例程序(Java)
提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。
public class FlinkSQLExecutor {
public static void main(String[] args) throws IOException {
System.out.println("-------------------- begin init ----------------------");
final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql");
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);
StatementSet statementSet = tableEnv.createStatementSet();
String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8");
String[] sqlArr = sqlStr.split(";");
for (String sql : sqlArr) {
sql = sql.trim();
if (sql.toLowerCase(Locale.ROOT).startsWith("create")) {
System.out.println("----------------------------------------------\nexecuteSql=\n" + sql);
tableEnv.executeSql(sql);
} else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) {
System.out.println("----------------------------------------------\ninsert=\n" + sql);
statementSet.addInsertSql(sql);
}
}
System.out.println("---------------------- begin exec sql --------------------------");
statementSet.execute();
}
}
需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。
以对接普通模式Kafka提交SQL为例:
create table kafka_sink
(
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
p varchar(20)
) with (
'connector' = 'kafka',
'topic' = 'input2',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'properties.group.id' = 'testGroup2',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
create TABLE datagen_source
(
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
p varchar(20)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
INSERT INTO kafka_sink
SELECT *
FROM datagen_source;