Flink Hudi Sample Program (Java)
Description
Call Flink APIs to read data from and write data to Hudi.
Sample Code
The following example shows the main logic code of WriteIntoHudi and ReadFromHudi.
For details about the complete code, see com.huawei.bigdata.flink.examples.WriteIntoHudi and com.huawei.bigdata.flink.examples.ReadFromHudi.
- Main logic code of WriteIntoHudi
public class WriteIntoHudi { public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteIntoHudi" + " /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable"); System.out.println( "******************************************************************************************"); System.out.println("<hudiTableName> is the hudi table name. (Default value is hudiSinkTable)"); System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)"); System.out.println( "******************************************************************************************"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getCheckpointConfig().setCheckpointInterval(10000); ParameterTool paraTool = ParameterTool.fromArgs(args); DataStream<RowData> stringDataStreamSource = env.addSource(new SimpleStringGenerator()) .map(new MapFunction<Tuple5<String, String, Integer, String, String>, RowData>() { @Override public RowData map(Tuple5<String, String, Integer, String, String> tuple5) throws Exception { GenericRowData rowData = new GenericRowData(5); rowData.setField(0, StringData.fromString(tuple5.f0)); rowData.setField(1, StringData.fromString(tuple5.f1)); rowData.setField(2, tuple5.f2); rowData.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf(tuple5.f3))); rowData.setField(4, StringData.fromString(tuple5.f4)); return rowData; } }); String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable"); String targetTable = paraTool.get("hudiTableName", "hudiSinkTable"); Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options); builder.sink(stringDataStreamSource, false); // The second parameter indicating whether the input data stream is bounded env.execute("Hudi_Sink"); } public static class SimpleStringGenerator implements SourceFunction<Tuple5<String, String, Integer, String, String>> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; Integer i = 0; @Override public void run(SourceContext<Tuple5<String, String, Integer, String, String>> ctx) throws Exception { while (running) { i++; String uuid = "uuid" + i; String name = "name" + i; Integer age = new Integer(i); String ts = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String p = "par" + i % 5; Tuple5<String, String, Integer, String, String> tuple5 = Tuple5.of(uuid, name, age, ts, p); ctx.collect(tuple5); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
- Main logic code of ReadFromHudi
public class ReadFromHudi { public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadFromHudi" + " /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable" + " --read.start-commit 20221206111532" ); System.out.println( "******************************************************************************************"); System.out.println("<hudiTableName> is the hoodie table name. (Default value is hudiSourceTable)"); System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)"); System.out.println("<read.start-commit> Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'. (Default value is earliest)"); System.out.println( "******************************************************************************************"); ParameterTool paraTool = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable"); String targetTable = paraTool.get("hudiTableName", "hudiSourceTable"); String startCommit = paraTool.get(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST); Map<String, String> options = new HashMap(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit); // specifies the start commit instant time HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options); DataStream<RowData> rowDataDataStream = builder.source(env); rowDataDataStream.map(new MapFunction<RowData, String>() { @Override public String map(RowData rowData) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("{"); sb.append("\"uuid\":\"").append(rowData.getString(0)).append("\","); sb.append("\"name\":\"").append(rowData.getString(1)).append("\","); sb.append("\"age\":").append(rowData.getInt(2)).append(","); sb.append("\"ts\":\"").append(rowData.getTimestamp(3, 0)).append("\","); sb.append("\"p\":\"").append(rowData.getString(4)).append("\""); sb.append("}"); return sb.toString(); } }).print(); env.execute("Hudi_Source"); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot