Updated on 2024-08-10 GMT+08:00

Flink Hudi Sample Program (Java)

Description

Call Flink APIs to read data from and write data to Hudi.

Sample Code

The following example shows the main logic code of WriteIntoHudi and ReadFromHudi.

For details about the complete code, see com.huawei.bigdata.flink.examples.WriteIntoHudi and com.huawei.bigdata.flink.examples.ReadFromHudi.

  • Main logic code of WriteIntoHudi
    public class WriteIntoHudi {
        public static void main(String[] args) throws Exception {
            System.out.println("use command as: ");
            System.out.println(
                    "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteIntoHudi"
                            + " /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable");
            System.out.println(
                    "******************************************************************************************");
            System.out.println("<hudiTableName> is the hudi table name. (Default value is hudiSinkTable)");
            System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)");
            System.out.println(
                    "******************************************************************************************");
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.getCheckpointConfig().setCheckpointInterval(10000);
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            DataStream<RowData> stringDataStreamSource = env.addSource(new SimpleStringGenerator())
                    .map(new MapFunction<Tuple5<String, String, Integer, String, String>, RowData>() {
                        @Override
                        public RowData map(Tuple5<String, String, Integer, String, String> tuple5) throws Exception {
                            GenericRowData rowData = new GenericRowData(5);
                            rowData.setField(0, StringData.fromString(tuple5.f0));
                            rowData.setField(1, StringData.fromString(tuple5.f1));
                            rowData.setField(2, tuple5.f2);
                            rowData.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf(tuple5.f3)));
                            rowData.setField(4, StringData.fromString(tuple5.f4));
                            return rowData;
                        }
                    });
            String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable");
            String targetTable = paraTool.get("hudiTableName", "hudiSinkTable");
            Map<String, String> options = new HashMap<>();
            options.put(FlinkOptions.PATH.key(), basePath);
            options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
            options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
            options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
            HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                    .column("uuid VARCHAR(20)")
                    .column("name VARCHAR(10)")
                    .column("age INT")
                    .column("ts TIMESTAMP(3)")
                    .column("p VARCHAR(20)")
                    .pk("uuid")
                    .partition("p")
                    .options(options);
            builder.sink(stringDataStreamSource, false); // The second parameter indicating whether the input data stream is bounded
            env.execute("Hudi_Sink");
        }
        public static class SimpleStringGenerator implements SourceFunction<Tuple5<String, String, Integer, String, String>> {
            private static final long serialVersionUID = 2174904787118597072L;
            boolean running = true;
            Integer i = 0;
    
            @Override
            public void run(SourceContext<Tuple5<String, String, Integer, String, String>> ctx) throws Exception {
                while (running) {
                    i++;
                    String uuid = "uuid" + i;
                    String name = "name" + i;
                    Integer age = new Integer(i);
                    String ts = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                    String p = "par" + i % 5;
                    Tuple5<String, String, Integer, String, String> tuple5 = Tuple5.of(uuid, name, age, ts, p);
                    ctx.collect(tuple5);
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
  • Main logic code of ReadFromHudi
    public class ReadFromHudi {
        public static void main(String[] args) throws Exception {
            System.out.println("use command as: ");
            System.out.println(
                    "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadFromHudi"
                            + " /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable"
                            + " --read.start-commit 20221206111532"
            );
            System.out.println(
                    "******************************************************************************************");
            System.out.println("<hudiTableName> is the hoodie table name. (Default value is hudiSourceTable)");
            System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)");
            System.out.println("<read.start-commit> Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'. (Default value is earliest)");
            System.out.println(
                    "******************************************************************************************");
    
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable");
            String targetTable = paraTool.get("hudiTableName", "hudiSourceTable");
            String startCommit = paraTool.get(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST);
            Map<String, String> options = new HashMap();
            options.put(FlinkOptions.PATH.key(), basePath);
            options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
            options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
            options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit); // specifies the start commit instant time
            HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                    .column("uuid VARCHAR(20)")
                    .column("name VARCHAR(10)")
                    .column("age INT")
                    .column("ts TIMESTAMP(3)")
                    .column("p VARCHAR(20)")
                    .pk("uuid")
                    .partition("p")
                    .options(options);
    
            DataStream<RowData> rowDataDataStream = builder.source(env);
            rowDataDataStream.map(new MapFunction<RowData, String>() {
                @Override
                public String map(RowData rowData) throws Exception {
                    StringBuilder sb = new StringBuilder();
                    sb.append("{");
                    sb.append("\"uuid\":\"").append(rowData.getString(0)).append("\",");
                    sb.append("\"name\":\"").append(rowData.getString(1)).append("\",");
                    sb.append("\"age\":").append(rowData.getInt(2)).append(",");
                    sb.append("\"ts\":\"").append(rowData.getTimestamp(3, 0)).append("\",");
                    sb.append("\"p\":\"").append(rowData.getString(4)).append("\"");
                    sb.append("}");
                    return sb.toString();
                }
            }).print();
            env.execute("Hudi_Source");
        }
    }