更新时间:2024-08-05 GMT+08:00
写Kudu数据
功能简介
通过KuduClient.newSession()方法生成一个KuduSession对象,然后再把插入记录动作执行到Kudu表里。
代码样例
如下是写数据的代码片段:
// Create a KuduSession.
KuduSession session = client.newSession();
for (int i = 0; i < numRows; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt("key", i);
// Make even-keyed row have a null 'value'.
if (i % 2 == 0) {
row.setNull("value");
} else {
row.addString("value", "value " + i);
}
session.apply(insert);
}
// Call session.close() to end the session and ensure the rows are
// flushed and errors are returned.
// You can also call session.flush() to do the same without ending the session.
// When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended)
// for most workloads, you must check the pending errors as shown below, since
// write operations are flushed to Kudu in background threads.
session.close();
if (session.countPendingErrors() != 0) {
System.out.println("errors inserting rows");
org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();
org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();
int numErrs = Math.min(errs.length, 5);
System.out.println("there were errors inserting rows to Kudu");
System.out.println("the first few errors follow:");
for (int i = 0; i < numErrs; i++) {
System.out.println(errs[i]);
}
if (roStatus.isOverflowed()) {
System.out.println("error buffer overflowed: some errors were discarded");
}
throw new RuntimeException("error inserting rows to Kudu");
}
System.out.println("Inserted " + numRows + " rows");
示例代码中,numRows是要写入的记录条数;需要特别注意写入请求的响应结果。
父主题: 开发Kudu应用