Uploading Streaming Data

Sample Code

Data can be uploaded in base64 or protobuf mode.

  • To set the upload mode to base64, use DISSetSerializedMode("base64").
  • To set the upload mode to protobuf, use DISSetSerializedMode("protobuf").

To obtain the upload mode set by the user, use DISGetSerializedMode().

The main codes for uploading streaming data are as follows:

#define MAX_RECORD_COUNT   500
#define MAX_RECORD_LEN   (1024*10)

The upload mode is synchronous upload. A long delay occurs when the data volume is large. To avoid a long delay, a maximum of 500 records can be uploaded at a time, and the data volume in a record cannot exceed 10,000. If the limit is exceeded, the program displays the following message:

Send Record Faiure, the httprspcode is 0. ret: 5
ErrorCode:
ErrorDetail:

ret: 5 indicates an invalid parameter.

The following is a specific implementation method:

char*streamName = "myStream";
char *host = "XXX.XXX.XXX.XXX:XXX"; 
char *region="southchina";
char *projectId="43eec61d4b514f359c97008a4f8bfb02";
 int ret = 0;    
DISResponseInfo RspInfo = {0};   
int serrializedMode = DISGetSerializedMode();   
 DISPutRecord Record[MAX_RECORD_COUNT] = {0};
int i = 0;
int count = MAX_RECORD_COUNT;
int len = MAX_RECORD_LEN;
char *pmsg[MAX_RECORD_COUNT];
printf("===================%s Begin=======================\n", __FUNCTION__);
if (GetProtobufMode() == serrializedMode)
{
for (i = 0; i < count; i++)
{
Record[i].recordData.stringLen = rand() % len;
pmsg[i] = malloc(Record[i].recordData.stringLen);
Record[i].recordData.data = genRandomStringWithoutNull(Record[i].recordData.stringLen, pmsg[i]);
Record[i].partitionKey = "2";
}
}
else if (GetBase64Mode() == serrializedMode)
{
for (i = 0; i < count; i++)
{
Record[i].recordData.stringLen = rand() % len;
pmsg[i] = malloc(Record[i].recordData.stringLen);
Record[i].recordData.data = genRandomString(Record[i].recordData.stringLen, pmsg[i]);
Record[i].partitionKey = "2";
}
}
    printf("inputrecord:%s\n",Record[0].recordData.data);
    printf("inputrecord:%s\n",Record[1].recordData.data);
    printf("inputrecord:%s\n",Record[2].recordData.data);    
    ret = PutRecords(host, projectId, region, streamName, 3, Record, PutRecordCallBack, &RspInfo);
    if(ret != 0 || RspInfo.HttpResponseCode >= 300)
    {
        printf("Send Record Faiure, the httprspcode is %d. ret: %d\r\n", RspInfo.HttpResponseCode, ret);
        printf("HttpResponseCode: %ld\r\n", RspInfo.HttpResponseCode);
        printf("ErrorCode: %s\r\n", RspInfo.ErrorCode);
        printf("ErrorDetail: %s\r\n", RspInfo.ErrorDetail);
    }
    else
    {
        printf("send Record Success,ret:%d\n",RspInfo.HttpResponseCode);
        printf("HttpResponseCode: %ld\r\n", RspInfo.HttpResponseCode);
}
for (i = 0; i < count; i++)
{
free(pmsg[i]);
 }
printf("===================%s End=======================\n",__FUNCTION__);

Table 1 DISPutRecord parameter description

Parameter

Mandatory

Type

Description

recordData

Yes

DISString

Data to be uploaded.

explicitHashKey

No

Char *

Specify the hash value of the data to be written to the partition. The hash value overwrites the hash value of partitionKey.

partitionId

No

Char *

Unique identifier of the partition.

partitionKey

No

Char *

Partition to which data is written to.

NOTE:

If partitionId is specified, it is preferentially used.

If partitionId is not specified, use partitionKey.

pucReserved

No

void *

Empty pointer, used to extend the request body.

Table 2 DISString parameter description

Parameter

Mandatory

Type

Description

stringLen

Yes

long

Length of the data to be uploaded.

data

Yes

Char *

Data to be uploaded.

The PutRecords method has a callback function PutRecordCallBack, which is used to process responses. This function can be customized. An example is provided for reference only.

The following shows how to use the callback function:

DISStatus PutRecordCallBack(char *errorCode, char *errorDetails, char *streamName, DISPutRecord *putRecord, char *SeqNumber, char *partitiodId)
{
    if(NULL != SeqNumber)
    {
        printf("Send Record:%s. key: %s success, the seqnum:%s, pid:%s\r\n", putRecord->recordData.data, putRecord->partitionKey, SeqNumber, partitiodId);
    }
    else
    {
        printf("Send Record:%s. key: %s fail, the errCode:%s, message:%s\r\n", putRecord->recordData.data, putRecord->partitionKey, errorCode, errorDetails);
    }
    return 0;
}

The following table describes the response parameters:

Parameter

Type

Description

errorCode

char *

Error code.

errorDetails

char *

Error message.

streamName

char *

Stream name.

putRecord

DISPutRecord *

Data to be uploaded.

partitiodId

char *

Partition ID.

SeqNumber

char *

Sequence number. A sequence number is the unique identifier of each record. DIS automatically allocates a sequence number when the data producer calls the PutRecords operation to add data to the DIS stream. The sequence number of the same partition key usually changes with time. A longer interval between PutRecords requests results in a larger sequence number.

Execution Result

Information similar to the following is displayed on the console:

Send Record:aaaaaaaaaa. key: 1 success, the seqnum:190, pid:shardId-0000000000
Send Record:0EDk3AwJuBQOdtTZbTUnw21ruxbRpNE4WMY71qIH3pujwWo93d9Puzojp0jYahVD06w2FZbL1dgaxz238So07RMYHr02D2CQW4T0DE5MF39KYILr9uR81mm2h6T5TU781kmT8xjBKWN5g9h33KN2QX0332xbnJ5ItJ3tu48YWOTzf3hKk7yk9aNp1fUXrbteIxyEOS48m4e17X. key: 1 success, the seqnum:191, pid:shardId-0000000000
Send Record:dHsTTR40lmym0l0Oxh580IIiuesLcnzYa27yyQ7e5nD40Td2z59OEFhU88HK3BxUfiNZdVpSaswNixL0lv7Tye4fl0Cp16g7PT5WflWZR9D83pCWUXfLTB9BXG1dO14266FOJs9N6Th3SqFRNDe5l6Jz5Vt07uyeeiDYRwwerF9y0RG41Vga. key: 2 success, the seqnum:192, pid:shardId-0000000000
send Record Success,ret:200
HttpResponseCode: 200