更新时间:2025-12-16 GMT+08:00
分享

Doris对接Flink Jar样例程序

Doris支持对接Flink Jar样例程序,用于将数据导入到Doris表中。

该章节内容仅适用于MRS 3.6.0-LTS及之后版本

代码样例

需根据实际环境修改“com/huawei/doris/FlinkDorisConnectorJarDemo.java”样例代码中的以下参数值:

  • HOST:值为Doris的Master FE节点IP地址,Master FE节点可通过在Manager界面,选择“集群 > 服务 > Doris”,查看“Leader所在的主机”获取。
  • PORT:值为Doris FE服务的HTTPS端口,可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”获取。
  • FE_NODES: 格式为“HOST1:PORT1,HOST2:PORT2,HOST3:PORT3”,FE_NODES可以是一组“HOST:PORT”,也可以是多组“HOST:PORT”,使用逗号分隔。
  • DATABASE:存放导入数据的表所在的数据库。
  • TABLE:存放导入数据的Doris表名称。
  • USER:Doris数据库的用户名。
  • PASSWD:Doris数据库的密码。
/**
 * 建表语句
 * create database example_db;
 * CREATE TABLE `example_table` (
 * `city` varchar(256) NULL,
 * `longitude` double NULL,
 * `latitude` double NULL,
 * `destroy_date` date NULL
 * ) ENGINE=OLAP
 * DUPLICATE KEY(`city`)
 * DISTRIBUTED BY HASH(`city`) BUCKETS 3
 */
private static String DATABASE = "example_db";
private static String TABLE_NAME = "example_table";
private static String FE_NODES = ""; // Leader Node host
private static String USER = "";
private static String PASSWD = "";

public static void main(String[] args) throws Exception {
    Properties confProperties = new Properties();
    // 使用ClassLoader加载properties配置文件生成对应的输入流
    InputStream in = FlinkDorisConnectorJarDemo.class.getClassLoader().getResourceAsStream("conf.properties");
    // 使用properties对象加载输入流
    confProperties.load(in);
    //获取key对应的value值
    USER = confProperties.getProperty("USER");
    PASSWD = confProperties.getProperty("PASSWD");
    FE_NODES = confProperties.getProperty("FE_NODES");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings environmentSettings =
            EnvironmentSettings.newInstance().inStreamingMode().build();
    env.enableCheckpointing(10);
    env.setParallelism(1);

    // enable checkpoint
    env.enableCheckpointing(10000);
    // using batch mode for bounded data
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);

    //doris sink option
    DorisSink.Builder<RowData> builder = DorisSink.builder();
    DorisOptions.Builder dorisBuilder = DorisOptions.builder();

    dorisBuilder.setFenodes(FE_NODES)
            .setTableIdentifier(DATABASE + "." + TABLE_NAME)
            .setUsername(USER)
            .setPassword(PASSWD)
            // 集群未启用Kerberos认证(普通模式)为“false”,集群已启用Kerberos认证(安全模式)为“true”
            .setIgnoreHttpsCA(true)
            // 集群未启用Kerberos认证(普通模式)为“false”,集群已启用Kerberos认证(安全模式)为“true”
            .setEnableHttps(true)
            // 默认值为“true”,如果Flink Jar在运行后报错“307”,可将该值修改为“false”
            .setAutoRedirect(false);

    // json format to streamload
    Properties properties = new Properties();

    properties.setProperty("format", "json");
    properties.setProperty("read_json_by_line", "false");
    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder.setLabelPrefix("flink-label-doris") //streamload label prefix
            .setDeletable(false)
            .setStreamLoadProp(properties); //streamload params

    //flink rowdata's schema
    String[] fields = {"city", "longitude", "latitude", "destroy_date"};
    DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};

    builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionBuilder.build())
            .setSerializer(RowDataSerializer.builder()    //serialize according to rowdata
                    .setFieldNames(fields)
                    .setType("json")           //json format
                    .setFieldType(types).build())
            .setDorisOptions(dorisBuilder.build());

    //mock rowdata source
    DataStream<RowData> source = env.fromElements("")
            .map(new MapFunction<String, RowData>() {
                @Override
                public RowData map(String value) throws Exception {
                    GenericRowData genericRowData = new GenericRowData(4);
                    genericRowData.setField(0, StringData.fromString("ULC"));
                    genericRowData.setField(1, 21.59);
                    genericRowData.setField(2, 31.56);
                    genericRowData.setField(3, LocalDate.now().toEpochDay());
                    return genericRowData;
                }
            });
    source.sinkTo(builder.build());
    env.execute("Flink DataStream example");

相关文档