Updated on 2024-04-02 GMT+08:00

Java Sample Code

The core logic for submitting SQL statements is as follows. Currently, only CREATE and INSERT statements can be submitted. For details about the complete code, see com.huawei.bigdata.flink.examples.FlinkSQLExecutor.

public class FlinkSQLExecutor {
    public static void main(String[] args) throws IOException {
        System.out.println("--------------------  begin init ----------------------");
        final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql");
        final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);
        StatementSet statementSet = tableEnv.createStatementSet();
        String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8");
        String[] sqlArr = sqlStr.split(";");
        for (String sql : sqlArr) {
            sql = sql.trim();
           if (sql.toLowerCase(Locale.ROOT).startsWith("create")) {
               System.out.println("----------------------------------------------\nexecuteSql=\n" + sql);
               tableEnv.executeSql(sql);
           } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) {
               System.out.println("----------------------------------------------\ninsert=\n" + sql);
               statementSet.addInsertSql(sql);
           }
        }
        System.out.println("---------------------- begin exec sql --------------------------");
        statementSet.execute();
    }
}

Copy the dependency package required by the current sample, that is, the JAR package in the lib file after compilation, to the lib folder on the client.

The following uses Kafka in a normal cluster as an example to describe how to submit SQL statements:

create table kafka_sink
(
    uuid varchar(20),
    name varchar(10),
    age  int,
    ts   timestamp(3),
    p    varchar(20)
) with (
      'connector' = 'kafka',
      'topic' = 'input2',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance',
      'properties.group.id' = 'testGroup2',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
      );
create TABLE datagen_source
(
    uuid varchar(20),
    name varchar(10),
    age  int,
    ts   timestamp(3),
    p    varchar(20)
) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1'
      );
INSERT INTO kafka_sink
SELECT *
FROM datagen_source;