更新时间:2024-10-25 GMT+08:00

上传流式数据

样例代码

使用初始化DIS客户端后的客户端实例将用户的流式数据通过DIS通道上传至DIS服务。

批量上传流式数据的主体代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。
 DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
            // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。
            .withAk(System.getenv("HUAWEICLOUD_SDK_AK"))
            .withSk(System.getenv("HUAWEICLOUD_SDK_SK"))
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();

// 配置通道名称
String streamName = "xxxx";
// 配置上传的数据
String message = "hello world.";

        PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
        // 上传10条
        for (int i = 0; i < 10; i++)
        {
            PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap((message + i).getBytes()));
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
        }
        putRecordsRequest.setRecords(putRecordsRequestEntryList);

        PutRecordsResult putRecordsResult = null;
        try
        {
            putRecordsResult = dic.putRecords(putRecordsRequest);
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
// 用户业务需要对上传结果逐条进行判断,确认是否上传成功
        if (putRecordsResult != null)
        {
            LOGGER.info("Put {} [{} successful / {} failed] records.",
                putRecordsResult.getRecords().size(),
                putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(),
                putRecordsResult.getFailedRecordCount());

            for (int j = 0; j < putRecordsResult.getRecords().size(); j++)
            {
                PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j);
                if (!StringUtils.isNullOrEmpty(putRecordsRequestEntry.getErrorCode()))
                {
                    // 上传失败
                    LOGGER.error("[{}] put failed, errorCode [{}], errorMessage [{}]",
                        new String(putRecordsRequestEntryList.get(j).getData().array()),
                        putRecordsRequestEntry.getErrorCode(),
                        putRecordsRequestEntry.getErrorMessage());
                }
                else
                {
                    // 上传成功
                    LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]",
                        new String(putRecordsRequestEntryList.get(j).getData().array()),
                        putRecordsRequestEntry.getPartitionId(),
                        putRecordsRequestEntry.getSequenceNumber());
                }
            }
        }

逐条上传流式数据的主体代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建DIS客户端实例
DIS dic = DISUtil.getInstance();

// 配置流名称
String streamName = DISUtil.getStreamName();

// 配置上传的数据
String message = "Hello world";

        PutRecordRequest putRecordsRequest = new PutRecordRequest();
        putRecordsRequest.setStreamName(streamName);
        putRecordsRequest.setData(ByteBuffer.wrap((message).getBytes()));
        try
        {
            PutRecordResult putRecordsResult = dic.putRecord(putRecordsRequest);
            // 上传成功
            LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]",
                message, putRecordsResult.getPartitionId(), putRecordsResult.getSequenceNumber());
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(), e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }

运行程序

右键选择Run As > 1 Java Application运行程序,若程序运行成功,可以在控制台查看到类似如下信息:

1
2
3
4
5
6
15:19:29.298 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - ========== BEGIN PUT ============
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - Put 3 records[3 successful / 0 failed].
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [1]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [2]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [3]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - ========== END PUT ============