更新时间:2024-08-27 GMT+08:00
上传实时数据
RES通过DIS SDK上传实时数据,用户实时日数据并做近线处理。当前仅支持Java语言的SDK,示例请参见《数据接入服务SDK参考》。
前提条件
下载SDK之后,需要修改pom文件中的依赖,对SDK进行升级,最新版本可升级至1.3.12。
上传实时数据至RES
- 初始化DIS客户端,使用代码初始化DIS SDK客户端实例,代码样例如下。具体方式请参见初始化DIS客户端。
1 2 3 4 5 6 7 8
// 创建DIS客户端实例 DIS dic = DISClientBuilder.standard() .withEndpoint("YOUR_ENDPOINT") .withAk("YOUR_AK") .withSk("YOUR_SK") .withProjectId("YOUR_PROJECT_ID") .withRegion("YOUR_REGION") .build();
其中,各参数说明如下:
- “YOUR_AK”、“YOUR_SK”即访问密钥,获取方式请参见获取访问密钥。
- “YOUR_PROJECT_ID”为项目ID、“YOUR_REGION”为区域ID,获取方式请参见获取项目名称、项目ID、区域ID。
- 获取需要上传通道的ID(streamId)。
- 单击近线数据源的“详情”
图1 获取通道ID
- 单击近线数据源的“详情”
- 上传实时数据,示例代码如下,其中,“streamId”的配置值要与步骤2中“通道ID”的值一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
// 配置通道ID String streamId = "xxxx"; // 配置上传的数据 PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamId(streamId); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); String path = ActionDataProducer.class.getClassLoader().getResource("action.json").getPath(); BufferedReader in = new BufferedReader(new FileReader(path)); String record = in.readLine(); int putCnt = 0; while (record != null && !record.isEmpty()) { putCnt++; System.out.println("Put the " + putCnt + " record: " + record); try { putRecordsRequestEntryList.clear(); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap((record).getBytes())); // PartitionKey为随机值可使数据均匀分布到所有分区中 putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000))); putRecordsRequestEntryList.add(putRecordsRequestEntry); putRecordsRequest.setRecords(putRecordsRequestEntryList); dic.putRecords(putRecordsRequest); } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } record = in.readLine(); } in.close();
父主题: 数据源管理