文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Flink开发指南(普通模式)/
开发Flink应用/
Flink读取Hudi表样例程序/
Flink Hudi样例程序(Java)
更新时间:2024-06-05 GMT+08:00
Flink Hudi样例程序(Java)
功能介绍
通过调用Flink API读写Hudi数据。
代码样例
下面列出WriteIntoHudi和ReadFromHudi主要逻辑代码作为演示。
完整代码参见com.huawei.bigdata.flink.examples.WriteIntoHudi和com.huawei.bigdata.flink.examples.ReadFromHudi。
- WriteIntoHudi主要逻辑代码
public class WriteIntoHudi { public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteIntoHudi" + " /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable"); System.out.println( "******************************************************************************************"); System.out.println("<hudiTableName> is the hudi table name. (Default value is hudiSinkTable)"); System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)"); System.out.println( "******************************************************************************************"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getCheckpointConfig().setCheckpointInterval(10000); ParameterTool paraTool = ParameterTool.fromArgs(args); DataStream<RowData> stringDataStreamSource = env.addSource(new SimpleStringGenerator()) .map(new MapFunction<Tuple5<String, String, Integer, String, String>, RowData>() { @Override public RowData map(Tuple5<String, String, Integer, String, String> tuple5) throws Exception { GenericRowData rowData = new GenericRowData(5); rowData.setField(0, StringData.fromString(tuple5.f0)); rowData.setField(1, StringData.fromString(tuple5.f1)); rowData.setField(2, tuple5.f2); rowData.setField(3, TimestampData.fromTimestamp(Timestamp.valueOf(tuple5.f3))); rowData.setField(4, StringData.fromString(tuple5.f4)); return rowData; } }); String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable"); String targetTable = paraTool.get("hudiTableName", "hudiSinkTable"); Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options); builder.sink(stringDataStreamSource, false); // The second parameter indicating whether the input data stream is bounded env.execute("Hudi_Sink"); } public static class SimpleStringGenerator implements SourceFunction<Tuple5<String, String, Integer, String, String>> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; Integer i = 0; @Override public void run(SourceContext<Tuple5<String, String, Integer, String, String>> ctx) throws Exception { while (running) { i++; String uuid = "uuid" + i; String name = "name" + i; Integer age = new Integer(i); String ts = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String p = "par" + i % 5; Tuple5<String, String, Integer, String, String> tuple5 = Tuple5.of(uuid, name, age, ts, p); ctx.collect(tuple5); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
- ReadFromHudi主要逻辑代码
public class ReadFromHudi { public static void main(String[] args) throws Exception { System.out.println("use command as: "); System.out.println( "./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadFromHudi" + " /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable" + " --read.start-commit 20221206111532" ); System.out.println( "******************************************************************************************"); System.out.println("<hudiTableName> is the hoodie table name. (Default value is hudiSourceTable)"); System.out.println("<hudiPath> Base path for the target hoodie table. (Default value is hdfs://hacluster/tmp/flinkHudi/hudiTable)"); System.out.println("<read.start-commit> Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'. (Default value is earliest)"); System.out.println( "******************************************************************************************"); ParameterTool paraTool = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String basePath = paraTool.get("hudiPath", "hdfs://hacluster/tmp/flinkHudi/hudiTable"); String targetTable = paraTool.get("hudiTableName", "hudiSourceTable"); String startCommit = paraTool.get(FlinkOptions.READ_START_COMMIT.key(), FlinkOptions.START_COMMIT_EARLIEST); Map<String, String> options = new HashMap(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit); // specifies the start commit instant time HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(20)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("p VARCHAR(20)") .pk("uuid") .partition("p") .options(options); DataStream<RowData> rowDataDataStream = builder.source(env); rowDataDataStream.map(new MapFunction<RowData, String>() { @Override public String map(RowData rowData) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("{"); sb.append("\"uuid\":\"").append(rowData.getString(0)).append("\","); sb.append("\"name\":\"").append(rowData.getString(1)).append("\","); sb.append("\"age\":").append(rowData.getInt(2)).append(","); sb.append("\"ts\":\"").append(rowData.getTimestamp(3, 0)).append("\","); sb.append("\"p\":\"").append(rowData.getString(4)).append("\""); sb.append("}"); return sb.toString(); } }).print(); env.execute("Hudi_Source"); } }
父主题: Flink读取Hudi表样例程序