Uploading Streaming Data
Sample Code
Use the initialized client instance to upload your streaming data to DIS through a DIS stream.
The code for uploading streaming data in batches is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | //Initialize the DIS SDK client instance. For details about endpoints, AKs, SKs, regions, and project IDs, see the following: DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") // Hard-coded or plaintext AK and SK are risky. For security purposes, encrypt your AK and SK and store them in the configuration file or environment variables. // In this example, the AK and SK stored in the environment variables are used for identity authentication. Before running this example, configure environment variables HUAWEICLOUD_SDK_AK and HUAWEICLOUD_SDK_SK in the local environment. .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("xxxx") .withRegion("xxxx") .build(); //Configure the stream name. String streamName = "xxxx"; //Configure the data to be uploaded. String message = "hello world."; PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); //Upload 10 records. for (int i = 0; i < 10; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap((message + i).getBytes())); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = null; try { putRecordsResult = dic.putRecords(putRecordsRequest); } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } //Check the upload records one by one to determine whether the upload is successful. if (putRecordsResult != null) { LOGGER.info("Put {} [{} successful / {} failed] records.", putRecordsResult.getRecords().size(), putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(), putRecordsResult.getFailedRecordCount()); for (int j = 0; j < putRecordsResult.getRecords().size(); j++) { PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j); if (!StringUtils.isNullOrEmpty(putRecordsRequestEntry.getErrorCode())) { // Upload failed LOGGER.error("[{}] put failed, errorCode [{}], errorMessage [{}]", new String(putRecordsRequestEntryList.get(j).getData().array()), putRecordsRequestEntry.getErrorCode(), putRecordsRequestEntry.getErrorMessage()); } else { // Uploaded LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]", new String(putRecordsRequestEntryList.get(j).getData().array()), putRecordsRequestEntry.getPartitionId(), putRecordsRequestEntry.getSequenceNumber()); } } } |
The code for uploading streaming data one by one is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | //Create a DIS client instance. DIS dic = DISUtil.getInstance(); //Configure the stream name. String streamName = DISUtil.getStreamName(); //Configure the data to be uploaded. String message = "Hello world"; PutRecordRequest putRecordsRequest = new PutRecordRequest(); putRecordsRequest.setStreamName(streamName); putRecordsRequest.setData(ByteBuffer.wrap((message).getBytes())); try { PutRecordResult putRecordsResult = dic.putRecord(putRecordsRequest); // Uploaded LOGGER.info("[{}] put success, partitionId [{}], sequenceNumber [{}]", message, putRecordsResult.getPartitionId(), putRecordsResult.getSequenceNumber()); } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } |
Running the Program
Right-click the program and choose from the shortcut menu. If the program is run successfully, you can view the information similar to the following on the console:
1 2 3 4 5 6 | 15:19:29.298 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - ========== BEGIN PUT ============ 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - Put 3 records[3 successful / 0 failed]. 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [1] 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [2] 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - [hello world.] put success, partitionId [shardId-0000000000], sequenceNumber [3] 15:19:30.992 [main] INFO com.bigdata.dis.sdk.demo.ProducerDemo - ========== END PUT ============ |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot
