更新时间:2025-12-08 GMT+08:00
Doris Stream Load接口调用样例程序
Doris Stream Load样例程序用于将本地文件csv文件数据导入到Doris表中。
本章节内容仅适用于MRS 3.3.1-LTS及之后版本。
代码样例
需根据实际环境修改样例代码中的以下参数值:
- HOST:值为Doris的Master FE节点IP地址,Master FE节点可通过在Manager界面,选择“集群 > 服务 > Doris”,查看“Leader所在的主机”获取。
- PORT:值为Doris FE服务的HTTPS端口,默认为29991,可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”获取。
- QUERY_PORT:值为Doris的MySQL协议查询连接端口,默认为29982,可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,搜索“query_port”获取。
- DATABASE:存放导入数据的表所在的数据库。
- TABLE:存放导入数据的Doris表名称。
public class DorisStreamLoader {
// FE IP Address
private final static String HOST = "192.168.13.178";
// 集群已启用Kerberos认证(安全模式)中FE Port为“https_port”参数值;集群未启用Kerberos认证(普通模式)FE Port为“http_port”参数值。
private final static int PORT = 29991;
private final static int JDBC_PORT = 29982;
// db name
private final static String DATABASE = "test_2";
// table name
private final static String TABLE = "doris_test_sink";
//JDBC_DRIVER仅适用于MRS 3.3.1-LTS版本。
private static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d?rewriteBatchedStatements=true";
private static final String USER = System.getenv("DORIS_MY_USER");
private static final String PASSWD = System.getenv("DORIS_MY_PASSWORD");
// 集群已启用Kerberos认证(安全模式)使用https开头, 集群未启用Kerberos认证(普通模式)使用http开头
private final static String loadUrl = String.format("https://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
//java 调用 Curl的方法
public static String execCurl(String[] cmds) {
ProcessBuilder process = new ProcessBuilder(cmds);
Process p;
try {
p = process.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
StringBuilder builder = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
builder.append(line);
builder.append(System.getProperty("line.separator"));
}
return builder.toString();
} catch (Exception e) {
System.out.print("error");
}
return null;
}
public static void initTable(){
String createDatabaseSql = "create database if not exists "+DATABASE;
String createTableSql = "create table if not exists " + DATABASE + "." + TABLE + " (\n" +
" `id` int NULL COMMENT \"\",\n" +
" `number` int NULL COMMENT \"\",\n" +
" `price` DECIMAL(12,2) NULL COMMENT \"\",\n" +
" `skuname` varchar(40) NULL COMMENT \"\",\n" +
" `skudesc` varchar(200) NULL COMMENT \"\"\n" +
" ) ENGINE=OLAP\n" +
" DUPLICATE KEY(`id`)\n" +
" COMMENT \"商品信息表\"\n" +
" DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" +
" PROPERTIES (\n" +
" \"replication_num\" = \"3\",\n" +
" \"in_memory\" = \"false\",\n" +
" \"storage_format\" = \"V2\"\n" +
" );";
try (Connection connection = createConnection()) {
// 创建数据库
System.out.println("Start create database.");
execDDL(connection, createDatabaseSql);
System.out.println("Database created successfully.");
// 创建表
System.out.println("Start create table.");
execDDL(connection, createTableSql);
System.out.println("Table created successfully.");
} catch (Exception e) {
System.out.println("Execute doris operation failed.");
}
}
private static Connection createConnection() throws Exception {
Connection connection = null;
try {
Class.forName(JDBC_DRIVER);
String dbUrl = String.format(DB_URL_PATTERN, HOST, JDBC_PORT);
connection = DriverManager.getConnection(dbUrl, USER, PASSWD);
} catch (Exception e) {
System.out.println("Init doris connection failed.");
throw new Exception(e);
}
return connection;
}
public static void execDDL(Connection connection, String sql) throws Exception {
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.execute();
} catch (Exception e) {
System.out.println("Execute sql {} failed.");
throw new Exception(e);
}
}
//接口调用
public static String getHttpPost(String csvPath) {
String[] cmdList = {"curl", "-k", "--location-trusted", "-u" + USER + ":" + PASSWD, "-H", "expect:100-continue", "-H", "column_separator:,", "-T",
csvPath,
loadUrl};
//命令的空格在jva数组里单个的,必须分开写,不能有空格,
String responseMsg = execCurl(cmdList);
System.out.println("curl" + responseMsg);
return responseMsg;
}
public static void main(String[] args) throws IOException {
initTable();
String path = DorisStreamLoader.class.getClassLoader().getResource("test.csv").getPath();
path = URLDecoder.decode(path, "UTF-8");
File file = new File(path);
String filePath = file.getAbsolutePath();
// 在linux场景需要预先将resource目录下test.csv文件上传到linux后台,然后在getHttpPost中替换对应的文件路径。
getHttpPost(filePath);
}
}
父主题: 开发Doris应用