Doris对接Flink Jar样例程序
Doris支持对接Flink Jar样例程序,用于将数据导入到Doris表中。
该章节内容仅适用于MRS 3.6.0-LTS及之后版本
代码样例
需根据实际环境修改“com/huawei/doris/FlinkDorisConnectorJarDemo.java”样例代码中的以下参数值:
- HOST:值为Doris的Master FE节点IP地址,Master FE节点可通过在Manager界面,选择“集群 > 服务 > Doris”,查看“Leader所在的主机”获取。
- PORT:值为Doris FE服务的HTTPS端口,可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”获取。
- FE_NODES: 格式为“HOST1:PORT1,HOST2:PORT2,HOST3:PORT3”,FE_NODES可以是一组“HOST:PORT”,也可以是多组“HOST:PORT”,使用逗号分隔。
- DATABASE:存放导入数据的表所在的数据库。
- TABLE:存放导入数据的Doris表名称。
- USER:Doris数据库的用户名。
- PASSWD:Doris数据库的密码。
/**
* 建表语句
* create database example_db;
* CREATE TABLE `example_table` (
* `city` varchar(256) NULL,
* `longitude` double NULL,
* `latitude` double NULL,
* `destroy_date` date NULL
* ) ENGINE=OLAP
* DUPLICATE KEY(`city`)
* DISTRIBUTED BY HASH(`city`) BUCKETS 3
*/
private static String DATABASE = "example_db";
private static String TABLE_NAME = "example_table";
private static String FE_NODES = ""; // Leader Node host
private static String USER = "";
private static String PASSWD = "";
public static void main(String[] args) throws Exception {
Properties confProperties = new Properties();
// 使用ClassLoader加载properties配置文件生成对应的输入流
InputStream in = FlinkDorisConnectorJarDemo.class.getClassLoader().getResourceAsStream("conf.properties");
// 使用properties对象加载输入流
confProperties.load(in);
//获取key对应的value值
USER = confProperties.getProperty("USER");
PASSWD = confProperties.getProperty("PASSWD");
FE_NODES = confProperties.getProperty("FE_NODES");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build();
env.enableCheckpointing(10);
env.setParallelism(1);
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(FE_NODES)
.setTableIdentifier(DATABASE + "." + TABLE_NAME)
.setUsername(USER)
.setPassword(PASSWD)
// 集群未启用Kerberos认证(普通模式)为“false”,集群已启用Kerberos认证(安全模式)为“true”
.setIgnoreHttpsCA(true)
// 集群未启用Kerberos认证(普通模式)为“false”,集群已启用Kerberos认证(安全模式)为“true”
.setEnableHttps(true)
// 默认值为“true”,如果Flink Jar在运行后报错“307”,可将该值修改为“false”
.setAutoRedirect(false);
// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "false");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("flink-label-doris") //streamload label prefix
.setDeletable(false)
.setStreamLoadProp(properties); //streamload params
//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(RowDataSerializer.builder() //serialize according to rowdata
.setFieldNames(fields)
.setType("json") //json format
.setFieldType(types).build())
.setDorisOptions(dorisBuilder.build());
//mock rowdata source
DataStream<RowData> source = env.fromElements("")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String value) throws Exception {
GenericRowData genericRowData = new GenericRowData(4);
genericRowData.setField(0, StringData.fromString("ULC"));
genericRowData.setField(1, 21.59);
genericRowData.setField(2, 31.56);
genericRowData.setField(3, LocalDate.now().toEpochDay());
return genericRowData;
}
});
source.sinkTo(builder.build());
env.execute("Flink DataStream example");