更新时间:2026-06-10 GMT+08:00
Java样例代码
功能介绍
通过调用Flink API读写Iceberg数据。
代码样例
下面列出ReadFromIcebergToIceberg主要逻辑代码作为演示。
完整代码参见com.huawei.bigdata.flink.examples.ReadFromIcebergToIceberg。
ReadFromIcebergToIceberg主要逻辑代码:
public class ReadFromIcebergToIceberg {
public static void main(String[] args) throws Exception {
System.out.println("use command as: ");
System.out.println(
"flink run --class com.huawei.bigdata.flink.examples.ReadFromIcebergToIceberg /opt/FlinkIcebergExample.jar "
+ " --hive.metastore.uris thrift://xxx.xxx.xxx.xxx:xx,thrift://xxx.xxx.xxx.xxx:xx " // Hive Metastore 地址,为hive-site.xml配置文件中hive.metastore.uris配置项的值
+ " --sourceTableName xxx " //sourceTableName为要读取的iceberg源表名称,默认表名为icebergsource
+ " --sinkTableName xxx " //sinkTableName为要写入的iceberg目标表名称,默认表名为icebergsink
+ " --hive.metastore.kerberos.principal xxxx "); // Hive Metastore principle,为hive-site.xml配置文件中hive.metastore.kerberos.principal配置项的值
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
ParameterTool paraTool = ParameterTool.fromArgs(args);
String catalogName = paraTool.get("catalogName", "hive_iceberg_catalog");
String hiveMetastoreUri = paraTool.get("hive.metastore.uris");
String sourceName = paraTool.get("sourceTableName", "icebergsource");
String sinkName = paraTool.get("sinkTableName", "icebergsink");
String hiveMetastorePrincipal = paraTool.get("hive.metastore.kerberos.principal");
String warehouse = paraTool.get("warehouse", "hdfs://hacluster/user/hive/warehouse");
Map<String, String> catalogProps = new HashMap<>();
catalogProps.put("type", "hive");
catalogProps.put("uri", hiveMetastoreUri);
catalogProps.put("warehouse", warehouse);
Configuration conf = new Configuration();
conf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
conf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, hiveMetastorePrincipal);
CatalogLoader catalogLoader = CatalogLoader.hive(catalogName, conf, catalogProps);
TableIdentifier table = TableIdentifier.of("default", sourceName);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, table);
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false)
.build();
DataStream<RowData> readStreamfile = batch.filter(new FilterFunction<RowData>() {
@Override
public boolean filter(RowData rowData) throws Exception {
return rowData.getInt(2) > 100;
}
});
TableIdentifier table2 = TableIdentifier.of("default", sinkName);
TableLoader tableLoader2 = TableLoader.fromCatalog(catalogLoader, table2);
FlinkSink.forRowData(readStreamfile)
.tableLoader(tableLoader2)
.overwrite(true)
.append();
env.execute();
}
} 父主题: Flink读写Iceberg