文档首页 > > SDK参考> 使用SDK> 使用SDK(C)> 上传流式数据

上传流式数据

分享
更新时间: 2019/05/07 GMT+08:00

样例代码

上传可以设置base64的方式或protobuf方式传输。

  • 使用DISSetSerializedMode("base64");设为base64方式上传
  • 使用DISSetSerializedMode("protobuf");设为protobuf方式上传

通过DISGetSerializedMode();可以获取到用户设置的传输方式。

上传流式数据的主体代码如下:

#define MAX_RECORD_COUNT   500
#define MAX_RECORD_LEN   (1024*10)

该上传的方式为同步上传,数据量较大时存在较长的时延。为了规避较长的时延,在同步上传的方法中限制上传的条数最多不超过500条,单条的数据量不超过10k。如果超过限制条件,程序将提示

Send Record Faiure, the httprspcode is 0. ret: 5
ErrorCode:
ErrorDetail:

ret: 5表示无效的参数

下面是具体的实现方法

char*streamName = "myStream";
char *host = "XXX.XXX.XXX.XXX:XXX"; 
char *region="southchina";
char *projectId="43eec61d4b514f359c97008a4f8bfb02";
 int ret = 0;    
DISResponseInfo RspInfo = {0};   
int serrializedMode = DISGetSerializedMode();   
 DISPutRecord Record[MAX_RECORD_COUNT] = {0};
int i = 0;
int count = MAX_RECORD_COUNT;
int len = MAX_RECORD_LEN;
char *pmsg[MAX_RECORD_COUNT];
printf("===================%s Begin=======================\n", __FUNCTION__);
if (GetProtobufMode() == serrializedMode)
{
for (i = 0; i < count; i++)
{
Record[i].recordData.stringLen = rand() % len;
pmsg[i] = malloc(Record[i].recordData.stringLen);
Record[i].recordData.data = genRandomStringWithoutNull(Record[i].recordData.stringLen, pmsg[i]);
Record[i].partitionKey = "2";
}
}
else if (GetBase64Mode() == serrializedMode)
{
for (i = 0; i < count; i++)
{
Record[i].recordData.stringLen = rand() % len;
pmsg[i] = malloc(Record[i].recordData.stringLen);
Record[i].recordData.data = genRandomString(Record[i].recordData.stringLen, pmsg[i]);
Record[i].partitionKey = "2";
}
}
    printf("inputrecord:%s\n",Record[0].recordData.data);
    printf("inputrecord:%s\n",Record[1].recordData.data);
    printf("inputrecord:%s\n",Record[2].recordData.data);    
    ret = PutRecords(host, projectId, region, streamName, 3, Record, PutRecordCallBack, &RspInfo);
    if(ret != 0 || RspInfo.HttpResponseCode >= 300)
    {
        printf("Send Record Faiure, the httprspcode is %d. ret: %d\r\n", RspInfo.HttpResponseCode, ret);
        printf("HttpResponseCode: %ld\r\n", RspInfo.HttpResponseCode);
        printf("ErrorCode: %s\r\n", RspInfo.ErrorCode);
        printf("ErrorDetail: %s\r\n", RspInfo.ErrorDetail);
    }
    else
    {
        printf("send Record Success,ret:%d\n",RspInfo.HttpResponseCode);
        printf("HttpResponseCode: %ld\r\n", RspInfo.HttpResponseCode);
}
for (i = 0; i < count; i++)
{
free(pmsg[i]);
 }
printf("===================%s End=======================\n",__FUNCTION__);

表1 DISPutRecord参数说明

名称

是否必选

类型

说明

recordData

DISString

需要上传的数据。

explicitHashKey

Char *

用于明确数据需要写入分区的哈希值,此哈希值将覆盖“partitionKey”的哈希值。

partitionId

Char *

分区的唯一标识符。

partitionKey

Char *

数据将写入的分区。

说明:

如果传了partitionId参数,则优先使用partitionId参数。

如果partitionId没有传,则使用partitionKey。

pucReserved

void *

空指针,用于拓展请求体

表2 DISString参数说明

名称

是否必选

类型

说明

stringLen

long

上传的数据的长度。

data

Char *

上传的数据。

注意PutRecords方法中有一个回调函数PutRecordCallBack,用于处理响应。该函数用户可以自定义,这里给出一个例子仅供参考。

PutRecordCallBack的回调函数如下:

DISStatus PutRecordCallBack(char *errorCode, char *errorDetails, char *streamName, DISPutRecord *putRecord, char *SeqNumber, char *partitiodId)
{
    if(NULL != SeqNumber)
    {
        printf("Send Record:%s. key: %s success, the seqnum:%s, pid:%s\r\n", putRecord->recordData.data, putRecord->partitionKey, SeqNumber, partitiodId);
    }
    else
    {
        printf("Send Record:%s. key: %s fail, the errCode:%s, message:%s\r\n", putRecord->recordData.data, putRecord->partitionKey, errorCode, errorDetails);
    }
    return 0;
}

该回调函数将得到响应参数如下表。

参数名

类型

说明

errorCode

char *

错误码。

errorDetails

char *

错误消息。

streamName

char *

通道名

putRecord

DISPutRecord *

上传的数据

partitiodId

char *

分区ID。

SeqNumber

char *

序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。

运行结果

可以在控制台查看到类似如下信息:

Send Record:aaaaaaaaaa. key: 1 success, the seqnum:190, pid:shardId-0000000000
Send Record:0EDk3AwJuBQOdtTZbTUnw21ruxbRpNE4WMY71qIH3pujwWo93d9Puzojp0jYahVD06w2FZbL1dgaxz238So07RMYHr02D2CQW4T0DE5MF39KYILr9uR81mm2h6T5TU781kmT8xjBKWN5g9h33KN2QX0332xbnJ5ItJ3tu48YWOTzf3hKk7yk9aNp1fUXrbteIxyEOS48m4e17X. key: 1 success, the seqnum:191, pid:shardId-0000000000
Send Record:dHsTTR40lmym0l0Oxh580IIiuesLcnzYa27yyQ7e5nD40Td2z59OEFhU88HK3BxUfiNZdVpSaswNixL0lv7Tye4fl0Cp16g7PT5WflWZR9D83pCWUXfLTB9BXG1dO14266FOJs9N6Th3SqFRNDe5l6Jz5Vt07uyeeiDYRwwerF9y0RG41Vga. key: 2 success, the seqnum:192, pid:shardId-0000000000
send Record Success,ret:200
HttpResponseCode: 200
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区