更新时间:2024-08-05 GMT+08:00

写Kudu数据

功能简介

通过KuduClient.newSession()方法生成一个KuduSession对象,然后再把插入记录动作执行到Kudu表里。

代码样例

如下是写数据的代码片段:

// Create a KuduSession.
KuduSession session = client.newSession();
for (int i = 0; i < numRows; i++) {
  Insert insert = table.newInsert();
  PartialRow row = insert.getRow();
  row.addInt("key", i);
  // Make even-keyed row have a null 'value'.
  if (i % 2 == 0) {
    row.setNull("value");
  } else {
    row.addString("value", "value " + i);
  }
  session.apply(insert);
}

// Call session.close() to end the session and ensure the rows are
// flushed and errors are returned.
// You can also call session.flush() to do the same without ending the session.
// When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended)
// for most workloads, you must check the pending errors as shown below, since
// write operations are flushed to Kudu in background threads.
session.close();
if (session.countPendingErrors() != 0) {
System.out.println("errors inserting rows");
  org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();
  org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();
  int numErrs = Math.min(errs.length, 5);
  System.out.println("there were errors inserting rows to Kudu");
  System.out.println("the first few errors follow:");
  for (int i = 0; i < numErrs; i++) {
    System.out.println(errs[i]);
  }
  if (roStatus.isOverflowed()) {
    System.out.println("error buffer overflowed: some errors were discarded");
  }
  throw new RuntimeException("error inserting rows to Kudu");
}
System.out.println("Inserted " + numRows + " rows");    

示例代码中,numRows是要写入的记录条数;需要特别注意写入请求的响应结果。