Uploading Streaming Data
Sample Code
Data can be uploaded in base64 or protobuf mode.
- To set the upload mode to base64, use DISSetSerializedMode("base64").
- To set the upload mode to protobuf, use DISSetSerializedMode("protobuf").
To obtain the upload mode set by the user, use DISGetSerializedMode().
The main codes for uploading streaming data are as follows:
#define MAX_RECORD_COUNT 500 #define MAX_RECORD_LEN (1024*10)
The upload mode is synchronous upload. A long delay occurs when the data volume is large. To avoid a long delay, a maximum of 500 records can be uploaded at a time, and the data volume in a record cannot exceed 10,000. If the limit is exceeded, the program displays the following message:
Send Record Faiure, the httprspcode is 0. ret: 5 ErrorCode: ErrorDetail:
ret: 5 indicates an invalid parameter.
The following is a specific implementation method:
char*streamName = "myStream";
char *host = "XXX.XXX.XXX.XXX:XXX";
char *region="southchina";
char *projectId="43eec61d4b514f359c97008a4f8bfb02";
int ret = 0;
DISResponseInfo RspInfo = {0};
int serrializedMode = DISGetSerializedMode();
DISPutRecord Record[MAX_RECORD_COUNT] = {0};
int i = 0;
int count = MAX_RECORD_COUNT;
int len = MAX_RECORD_LEN;
char *pmsg[MAX_RECORD_COUNT];
printf("===================%s Begin=======================\n", __FUNCTION__);
if (GetProtobufMode() == serrializedMode)
{
for (i = 0; i < count; i++)
{
Record[i].recordData.stringLen = rand() % len;
pmsg[i] = malloc(Record[i].recordData.stringLen);
Record[i].recordData.data = genRandomStringWithoutNull(Record[i].recordData.stringLen, pmsg[i]);
Record[i].partitionKey = "2";
}
}
else if (GetBase64Mode() == serrializedMode)
{
for (i = 0; i < count; i++)
{
Record[i].recordData.stringLen = rand() % len;
pmsg[i] = malloc(Record[i].recordData.stringLen);
Record[i].recordData.data = genRandomString(Record[i].recordData.stringLen, pmsg[i]);
Record[i].partitionKey = "2";
}
}
printf("inputrecord:%s\n",Record[0].recordData.data);
printf("inputrecord:%s\n",Record[1].recordData.data);
printf("inputrecord:%s\n",Record[2].recordData.data);
ret = PutRecords(host, projectId, region, streamName, 3, Record, PutRecordCallBack, &RspInfo);
if(ret != 0 || RspInfo.HttpResponseCode >= 300)
{
printf("Send Record Faiure, the httprspcode is %d. ret: %d\r\n", RspInfo.HttpResponseCode, ret);
printf("HttpResponseCode: %ld\r\n", RspInfo.HttpResponseCode);
printf("ErrorCode: %s\r\n", RspInfo.ErrorCode);
printf("ErrorDetail: %s\r\n", RspInfo.ErrorDetail);
}
else
{
printf("send Record Success,ret:%d\n",RspInfo.HttpResponseCode);
printf("HttpResponseCode: %ld\r\n", RspInfo.HttpResponseCode);
}
for (i = 0; i < count; i++)
{
free(pmsg[i]);
}
printf("===================%s End=======================\n",__FUNCTION__);
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
recordData |
Yes |
DISString |
Data to be uploaded. |
|
explicitHashKey |
No |
Char * |
Specify the hash value of the data to be written to the partition. The hash value overwrites the hash value of partitionKey. |
|
partitionId |
No |
Char * |
Unique identifier of the partition. |
|
partitionKey |
No |
Char * |
Partition to which data is written to.
NOTE:
If partitionId is specified, it is preferentially used. If partitionId is not specified, use partitionKey. |
|
pucReserved |
No |
void * |
Empty pointer, used to extend the request body. |
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
stringLen |
Yes |
long |
Length of the data to be uploaded. |
|
data |
Yes |
Char * |
Data to be uploaded. |
The PutRecords method has a callback function PutRecordCallBack, which is used to process responses. This function can be customized. An example is provided for reference only.
The following shows how to use the callback function:
DISStatus PutRecordCallBack(char *errorCode, char *errorDetails, char *streamName, DISPutRecord *putRecord, char *SeqNumber, char *partitiodId)
{
if(NULL != SeqNumber)
{
printf("Send Record:%s. key: %s success, the seqnum:%s, pid:%s\r\n", putRecord->recordData.data, putRecord->partitionKey, SeqNumber, partitiodId);
}
else
{
printf("Send Record:%s. key: %s fail, the errCode:%s, message:%s\r\n", putRecord->recordData.data, putRecord->partitionKey, errorCode, errorDetails);
}
return 0;
}
The following table describes the response parameters:
|
Parameter |
Type |
Description |
|---|---|---|
|
errorCode |
char * |
Error code. |
|
errorDetails |
char * |
Error message. |
|
streamName |
char * |
Stream name. |
|
putRecord |
DISPutRecord * |
Data to be uploaded. |
|
partitiodId |
char * |
Partition ID. |
|
SeqNumber |
char * |
Sequence number. A sequence number is the unique identifier of each record. DIS automatically allocates a sequence number when the data producer calls the PutRecords operation to add data to the DIS stream. The sequence number of the same partition key usually changes with time. A longer interval between PutRecords requests results in a larger sequence number. |
Execution Result
Information similar to the following is displayed on the console:
Send Record:aaaaaaaaaa. key: 1 success, the seqnum:190, pid:shardId-0000000000 Send Record:0EDk3AwJuBQOdtTZbTUnw21ruxbRpNE4WMY71qIH3pujwWo93d9Puzojp0jYahVD06w2FZbL1dgaxz238So07RMYHr02D2CQW4T0DE5MF39KYILr9uR81mm2h6T5TU781kmT8xjBKWN5g9h33KN2QX0332xbnJ5ItJ3tu48YWOTzf3hKk7yk9aNp1fUXrbteIxyEOS48m4e17X. key: 1 success, the seqnum:191, pid:shardId-0000000000 Send Record:dHsTTR40lmym0l0Oxh580IIiuesLcnzYa27yyQ7e5nD40Td2z59OEFhU88HK3BxUfiNZdVpSaswNixL0lv7Tye4fl0Cp16g7PT5WflWZR9D83pCWUXfLTB9BXG1dO14266FOJs9N6Th3SqFRNDe5l6Jz5Vt07uyeeiDYRwwerF9y0RG41Vga. key: 2 success, the seqnum:192, pid:shardId-0000000000 send Record Success,ret:200 HttpResponseCode: 200
Last Article: Querying Stream Details
Next Article: Downloading Streaming Data
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.