更新时间:2026-06-10 GMT+08:00
分享

Java样例代码

功能介绍

通过调用Flink API读写Iceberg数据。

代码样例

下面列出ReadFromIcebergToIceberg主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.ReadFromIcebergToIceberg。

ReadFromIcebergToIceberg主要逻辑代码:
public class ReadFromIcebergToIceberg {
    public static void main(String[] args) throws Exception {
        System.out.println("use command as: ");
        System.out.println(
                "flink run --class com.huawei.bigdata.flink.examples.ReadFromIcebergToIceberg /opt/FlinkIcebergExample.jar "
                        + "  --hive.metastore.uris thrift://xxx.xxx.xxx.xxx:xx,thrift://xxx.xxx.xxx.xxx:xx "        // Hive Metastore 地址,为hive-site.xml配置文件中hive.metastore.uris配置项的值
                        + "  --sourceTableName xxx  "    //sourceTableName为要读取的iceberg源表名称,默认表名为icebergsource  
                        + "  --sinkTableName xxx  "    //sinkTableName为要写入的iceberg目标表名称,默认表名为icebergsink  
                        + "  --hive.metastore.kerberos.principal xxxx  ");   // Hive Metastore principle,为hive-site.xml配置文件中hive.metastore.kerberos.principal配置项的值

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);

        ParameterTool paraTool = ParameterTool.fromArgs(args);
        String catalogName = paraTool.get("catalogName", "hive_iceberg_catalog");
        String hiveMetastoreUri = paraTool.get("hive.metastore.uris"); 
        String sourceName = paraTool.get("sourceTableName", "icebergsource");
        String sinkName = paraTool.get("sinkTableName", "icebergsink");
        String hiveMetastorePrincipal = paraTool.get("hive.metastore.kerberos.principal"); 
        String warehouse = paraTool.get("warehouse", "hdfs://hacluster/user/hive/warehouse"); 

        Map<String, String> catalogProps = new HashMap<>();
        catalogProps.put("type", "hive");
        catalogProps.put("uri", hiveMetastoreUri);
        catalogProps.put("warehouse", warehouse);
        Configuration conf = new Configuration();
        conf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
        conf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, hiveMetastorePrincipal);
        CatalogLoader catalogLoader = CatalogLoader.hive(catalogName, conf, catalogProps);
        TableIdentifier table = TableIdentifier.of("default", sourceName);
        TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, table);

        DataStream<RowData> batch = FlinkSource.forRowData()
                .env(env)
                .tableLoader(tableLoader)
                .streaming(false)
                .build();

        DataStream<RowData> readStreamfile = batch.filter(new FilterFunction<RowData>() {
            @Override
            public boolean filter(RowData rowData) throws Exception {
                return rowData.getInt(2) > 100;
            }
        });

        TableIdentifier table2 = TableIdentifier.of("default", sinkName);
        TableLoader tableLoader2 = TableLoader.fromCatalog(catalogLoader, table2);

        FlinkSink.forRowData(readStreamfile)
                .tableLoader(tableLoader2)
                .overwrite(true)
                .append();

        env.execute();
    }
}

相关文档