Updated on 2026-01-27 GMT+08:00

Uploading Streaming Data

Sample Code

Use the initialized client instance to upload your streaming data to DIS through a DIS stream.

The code for uploading streaming data in batches is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//Initialize the DIS SDK client instance. For details about endpoints, AKs, SKs, regions, and project IDs, see the following:
 DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
        // Hard-coded or plaintext AK and SK are risky. For security purposes, encrypt your AK and SK and store them in the configuration file or environment variables.
       // In this example, the AK and SK stored in the environment variables are used for identity authentication. Before running this example, configure environment variables HUAWEICLOUD_SDK_AK and HUAWEICLOUD_SDK_SK in the local environment.
            .withAk(System.getenv("HUAWEICLOUD_SDK_AK"))
            .withSk(System.getenv("HUAWEICLOUD_SDK_SK"))
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();

//Configure the stream name.
String streamName = "xxxx";
//Configure the data to be uploaded.
String message = "hello world.";

        PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
        //Upload 10 records.
        for (int i = 0; i < 10; i++)
        {
            PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap((message + i).getBytes()));
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
        }
        putRecordsRequest.setRecords(putRecordsRequestEntryList);

        PutRecordsResult putRecordsResult = null;
        try
        {
            putRecordsResult = dic.putRecords(putRecordsRequest);
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
//Check the upload records one by one to determine whether the upload is successful.
        if (putRecordsResult != null)
        {
            LOGGER.info("Put {} [{} successful / {} failed] records.",
                putRecordsResult.getRecords().size(),
                putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(),
                putRecordsResult.getFailedRecordCount());

            for (int j = 0; j < putRecordsResult.getRecords().size(); j++)
            {
                PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j);
                if (!StringUtils.isNullOrEmpty(putRecordsRequestEntry.getErrorCode()))
                {
                    // Upload failed
                    LOGGER.error("[{}] put failed, errorCode [{}], errorMessage [{}]",
                        new String(putRecordsRequestEntryList.get(j).getData().array()),
                        putRecordsRequestEntry.getErrorCode(),
                        putRecordsRequestEntry.getErrorMessage());
                }
                else
                {
                    // Uploaded
                    LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]",
                        new String(putRecordsRequestEntryList.get(j).getData().array()),
                        putRecordsRequestEntry.getPartitionId(),
                        putRecordsRequestEntry.getSequenceNumber());
                }
            }
        }

The code for uploading streaming data one by one is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//Create a DIS client instance.
DIS dic = DISUtil.getInstance();

//Configure the stream name.
String streamName = DISUtil.getStreamName();

//Configure the data to be uploaded.
String message = "Hello world";

        PutRecordRequest putRecordsRequest = new PutRecordRequest();
        putRecordsRequest.setStreamName(streamName);
        putRecordsRequest.setData(ByteBuffer.wrap((message).getBytes()));
        try
        {
            PutRecordResult putRecordsResult = dic.putRecord(putRecordsRequest);
            // Uploaded
            LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]",
                message, putRecordsResult.getPartitionId(), putRecordsResult.getSequenceNumber());
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(), e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program is run successfully, you can view the information similar to the following on the console:

1
2
3
4
5
6
15:19:29.298 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - ========== BEGIN PUT ============
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - Put 3 records[3 successful / 0 failed].
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [1]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [2]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [3]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - ========== END PUT ============