更新时间:2024-08-27 GMT+08:00
分享

上传实时数据

RES通过DIS SDK上传实时数据,用户实时日数据并做近线处理。当前仅支持Java语言的SDK,示例请参见《数据接入服务SDK参考》

前提条件

  • 如果需要使用近线上传实时数据的用户,可以使用DIS SDK接口上传,请您按照需求下载DIS SDK,下载完之后按照下面的说明进行SDK升级。
  • 子账户无法使用SDK上传数据,需要主账号授权子账号DIS USER权限。详细请参考权限管理

下载SDK之后,需要修改pom文件中的依赖,对SDK进行升级,最新版本可升级至1.3.12。

上传实时数据至RES

  1. 初始化DIS客户端,使用代码初始化DIS SDK客户端实例,代码样例如下。具体方式请参见初始化DIS客户端
    1
    2
    3
    4
    5
    6
    7
    8
    // 创建DIS客户端实例
    DIS dic = DISClientBuilder.standard()
    	.withEndpoint("YOUR_ENDPOINT")
    	.withAk("YOUR_AK")
    	.withSk("YOUR_SK")
    	.withProjectId("YOUR_PROJECT_ID")
    	.withRegion("YOUR_REGION")
    	.build();
    

    其中,各参数说明如下:

  2. 获取需要上传通道的ID(streamId)。
    • 单击近线数据源的“详情”
      图1 获取通道ID
  3. 上传实时数据,示例代码如下,其中,“streamId”的配置值要与步骤2“通道ID”的值一致。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // 配置通道ID
    String streamId = "xxxx";
    // 配置上传的数据
    
    PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
    putRecordsRequest.setStreamId(streamId);
    List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
    
    String path = ActionDataProducer.class.getClassLoader().getResource("action.json").getPath();
    BufferedReader in = new BufferedReader(new FileReader(path));
    String record = in.readLine();
    int putCnt = 0;
    while (record != null && !record.isEmpty()) {    
        putCnt++;
        System.out.println("Put the " + putCnt + " record: " + record);    
        try {
            putRecordsRequestEntryList.clear();
            PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap((record).getBytes()));
            // PartitionKey为随机值可使数据均匀分布到所有分区中
            putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
            putRecordsRequest.setRecords(putRecordsRequestEntryList);
    	dic.putRecords(putRecordsRequest);
        } catch (DISClientException e) {
    	LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
    		e.getMessage(),
    		e);
        } catch (Exception e) {
    	LOGGER.error(e.getMessage(), e);
        }
        record = in.readLine();
    }
    in.close();
    

相关文档