Updated on 2023-06-21 GMT+08:00

Uploading Streaming Data

Sample Code

Use the initialized client instance to upload your streaming data to DIS through a DIS stream.

The code for uploading streaming data is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//Initialize the DIS SDK client instance. For details about endpoints, AKs, SKs, regions, and project IDs, see the following:


DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();

//Configure the stream name.
String streamName = "xxxx";
//Configure the data to be uploaded.
String message = "hello world.";
 PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
        for (int i = 0; i < 3; i++)
        {
            PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(buffer);
            putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
        }
        putRecordsRequest.setRecords(putRecordsRequestEntryList);

        LOGGER.info("========== BEGIN PUT ============");

        PutRecordsResult putRecordsResult = null;
        try
        {
            putRecordsResult = dic.putRecords(putRecordsRequest);
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program is run successfully, you can view the information similar to the following on the console:

1
2
3
4
5
6
15:19:29.298 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - ========== BEGIN PUT ============
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - Put 3 records[3 successful / 0 failed].
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], partitionKey [261045], sequenceNumber [1]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], partitionKey [958815], sequenceNumber [2]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], partitionKey [416421], sequenceNumber [3]
15:19:30.992 [main] INFO  com.bigdata.dis.sdk.demo.ProducerDemo - ========== END PUT ============