更新时间:2024-10-25 GMT+08:00
上传流式数据
样例代码
使用初始化DIS客户端后的客户端实例将用户的流式数据通过DIS通道上传至DIS服务。
其中,“streamName”的配置值要与开通DIS通道中“通道名称”的值一致,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见获取认证信息。
批量上传流式数据的主体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
//初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。 DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("xxxx") .withRegion("xxxx") .build(); // 配置通道名称 String streamName = "xxxx"; // 配置上传的数据 String message = "hello world."; PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); // 上传10条 for (int i = 0; i < 10; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap((message + i).getBytes())); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = null; try { putRecordsResult = dic.putRecords(putRecordsRequest); } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } // 用户业务需要对上传结果逐条进行判断,确认是否上传成功 if (putRecordsResult != null) { LOGGER.info("Put {} [{} successful / {} failed] records.", putRecordsResult.getRecords().size(), putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(), putRecordsResult.getFailedRecordCount()); for (int j = 0; j < putRecordsResult.getRecords().size(); j++) { PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j); if (!StringUtils.isNullOrEmpty(putRecordsRequestEntry.getErrorCode())) { // 上传失败 LOGGER.error("[{}] put failed, errorCode [{}], errorMessage [{}]", new String(putRecordsRequestEntryList.get(j).getData().array()), putRecordsRequestEntry.getErrorCode(), putRecordsRequestEntry.getErrorMessage()); } else { // 上传成功 LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]", new String(putRecordsRequestEntryList.get(j).getData().array()), putRecordsRequestEntry.getPartitionId(), putRecordsRequestEntry.getSequenceNumber()); } } } |
逐条上传流式数据的主体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// 创建DIS客户端实例 DIS dic = DISUtil.getInstance(); // 配置流名称 String streamName = DISUtil.getStreamName(); // 配置上传的数据 String message = "Hello world"; PutRecordRequest putRecordsRequest = new PutRecordRequest(); putRecordsRequest.setStreamName(streamName); putRecordsRequest.setData(ByteBuffer.wrap((message).getBytes())); try { PutRecordResult putRecordsResult = dic.putRecord(putRecordsRequest); // 上传成功 LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]", message, putRecordsResult.getPartitionId(), putRecordsResult.getSequenceNumber()); } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } |
运行程序
右键选择
运行程序,若程序运行成功,可以在控制台查看到类似如下信息:
1 2 3 4 5 6 |
15:19:29.298 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - ========== BEGIN PUT ============ 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - Put 3 records[3 successful / 0 failed]. 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [1] 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [2] 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [3] 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - ========== END PUT ============ |
父主题: 使用SDK(Java)