更新时间:2024-08-03 GMT+08:00

Flink Hudi样例程序(Java)

功能介绍

通过调用Flink API读写Hudi数据。

代码样例

下面列出WriteIntoHudi和ReadFromHudi主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoHudi和com.huawei.bigdata.flink.examples.ReadFromHudi。

  • WriteIntoHudi主要逻辑代码
    public class WriteIntoHudi {
        public static void main(String[] args) throws Exception {
            System.out.println("use command as: ");
            System.out.println(
                    "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteIntoHudi"
                            + " /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable");
            System.out.println(
                    "******************************************************************************************");
            System.out.println("<hudiTableName> is the hudi table name. (Default value is hudiSinkTable)");
            System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)");
            System.out.println(
                    "******************************************************************************************");
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.getCheckpointConfig().setCheckpointInterval(10000);
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            DataStream<RowData> stringDataStreamSource = env.addSource(new SimpleStringGenerator())
                    .map(new MapFunction<Tuple5<String, String, Integer, String, String>, RowData>() {
                        @Override
                        public RowData map(Tuple5<String, String, Integer, String, String> tuple5) throws Exception {
                            GenericRowData rowData = new GenericRowData(5);
                            rowData.setField(0, StringData.fromString(tuple5.f0));
                            rowData.setField(1, StringData.fromString(tuple5.f1));
                            rowData.setField(2, tuple5.f2);
                            rowData.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf(tuple5.f3)));
                            rowData.setField(4, StringData.fromString(tuple5.f4));
                            return rowData;
                        }
                    });
            String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable");
            String targetTable = paraTool.get("hudiTableName", "hudiSinkTable");
            Map<String, String> options = new HashMap<>();
            options.put(FlinkOptions.PATH.key(), basePath);
            options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
            options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
            options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
            HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                    .column("uuid VARCHAR(20)")
                    .column("name VARCHAR(10)")
                    .column("age INT")
                    .column("ts TIMESTAMP(3)")
                    .column("p VARCHAR(20)")
                    .pk("uuid")
                    .partition("p")
                    .options(options);
            builder.sink(stringDataStreamSource, false); // The second parameter indicating whether the input data stream is bounded
            env.execute("Hudi_Sink");
        }
        public static class SimpleStringGenerator implements SourceFunction<Tuple5<String, String, Integer, String, String>> {
            private static final long serialVersionUID = 2174904787118597072L;
            boolean running = true;
            Integer i = 0;
    
            @Override
            public void run(SourceContext<Tuple5<String, String, Integer, String, String>> ctx) throws Exception {
                while (running) {
                    i++;
                    String uuid = "uuid" + i;
                    String name = "name" + i;
                    Integer age = new Integer(i);
                    String ts = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                    String p = "par" + i % 5;
                    Tuple5<String, String, Integer, String, String> tuple5 = Tuple5.of(uuid, name, age, ts, p);
                    ctx.collect(tuple5);
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
  • ReadFromHudi主要逻辑代码
    public class ReadFromHudi {
        public static void main(String[] args) throws Exception {
            System.out.println("use command as: ");
            System.out.println(
                    "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadFromHudi"
                            + " /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable"
                            + " --read.start-commit 20221206111532"
            );
            System.out.println(
                    "******************************************************************************************");
            System.out.println("<hudiTableName> is the hoodie table name. (Default value is hudiSourceTable)");
            System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)");
            System.out.println("<read.start-commit> Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'. (Default value is earliest)");
            System.out.println(
                    "******************************************************************************************");
    
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable");
            String targetTable = paraTool.get("hudiTableName", "hudiSourceTable");
            String startCommit = paraTool.get(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST);
            Map<String, String> options = new HashMap();
            options.put(FlinkOptions.PATH.key(), basePath);
            options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
            options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
            options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit); // specifies the start commit instant time
            HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                    .column("uuid VARCHAR(20)")
                    .column("name VARCHAR(10)")
                    .column("age INT")
                    .column("ts TIMESTAMP(3)")
                    .column("p VARCHAR(20)")
                    .pk("uuid")
                    .partition("p")
                    .options(options);
    
            DataStream<RowData> rowDataDataStream = builder.source(env);
            rowDataDataStream.map(new MapFunction<RowData, String>() {
                @Override
                public String map(RowData rowData) throws Exception {
                    StringBuilder sb = new StringBuilder();
                    sb.append("{");
                    sb.append("\"uuid\":\"").append(rowData.getString(0)).append("\",");
                    sb.append("\"name\":\"").append(rowData.getString(1)).append("\",");
                    sb.append("\"age\":").append(rowData.getInt(2)).append(",");
                    sb.append("\"ts\":\"").append(rowData.getTimestamp(3, 0)).append("\",");
                    sb.append("\"p\":\"").append(rowData.getString(4)).append("\"");
                    sb.append("}");
                    return sb.toString();
                }
            }).print();
            env.execute("Hudi_Source");
        }
    }