更新时间:2023-11-10 GMT+08:00
分享

Java样例代码

提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。

public class FlinkSQLExecutor {
    public static void main(String[] args) throws IOException {
        System.out.println("--------------------  begin init ----------------------");
        final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql");
        final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);
        StatementSet statementSet = tableEnv.createStatementSet();
        String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8");
        String[] sqlArr = sqlStr.split(";");
        for (String sql : sqlArr) {
            sql = sql.trim();
           if (sql.toLowerCase(Locale.ROOT).startsWith("create")) {
               System.out.println("----------------------------------------------\nexecuteSql=\n" + sql);
               tableEnv.executeSql(sql);
           } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) {
               System.out.println("----------------------------------------------\ninsert=\n" + sql);
               statementSet.addInsertSql(sql);
           }
        }
        System.out.println("---------------------- begin exec sql --------------------------");
        statementSet.execute();
    }
}

需将当前样例需要的依赖包,即编译之后lib文件下面的jar包拷贝到客户端的lib文件夹内。

以对接普通模式Kafka提交SQL为例:

create table kafka_sink
(
    uuid varchar(20),
    name varchar(10),
    age  int,
    ts   timestamp(3),
    p    varchar(20)
) with (
      'connector' = 'kafka',
      'topic' = 'input2',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup2',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
      );
create TABLE datagen_source
(
    uuid varchar(20),
    name varchar(10),
    age  int,
    ts   timestamp(3),
    p    varchar(20)
) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1'
      );
INSERT INTO kafka_sink
SELECT *
FROM datagen_source;
分享:

    相关文档

    相关产品