Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Situation Awareness
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Downloading Streaming Data

Updated on 2024-10-25 GMT+08:00

Context

To download streaming data, you need to determine the position where the data is obtained from the partition, that is, to obtain the cursor. After the start position is determined, the data is obtained cyclically.

There are five cursor types available:

  • AT_SEQUENCE_NUMBER
  • AFTER_SEQUENCE_NUMBER
  • TRIM_HORIZON
  • LATEST
  • AT_TIMESTAMP

To better understand the cursor type, you need to understand the following basic concepts:

  • A sequence number (SN) is the unique identifier of each record. DIS automatically allocates an SN when a data producer calls the PutRecords operation to add data to the DIS stream. SN of the same partition key usually changes with time. A longer interval between PutRecords requests results in a larger sequence number.
  • SN of each partition increases from 0. Each data record corresponds to a unique SN. As a lifecycle ends, the SN expires. For example, after a data record is uploaded to a new partition and its SN starts from 0. After 100 data records are uploaded, the SN of the last data record is 99. When the lifecycle ends, SNs 0 to 99 become unavailable.
  • The SN range of a partition can be obtained by calling the describeStream API for querying stream details. sequenceNumberRange indicates the data validity range. The first value is the SN of the earliest data, the last value is the SN of the next uploaded data, and the SN of the latest data is one less than the last value.

    For example, [100, 200] indicates that a total of 200 data records have been uploaded to the partition, data records 0 to 99 have expired, the earliest valid data record is 100, the latest data record is 199, and the SN of the next data record to be uploaded is 200.

Scenario Description

The following table describes the application scenarios of the five cursor types:

Table 1 Scenario description

Cursor Type

Description

Application Scenario

Remarks

AT_SEQUENCE_NUMBER

Data is read from the position denoted by a specific SN. The SN is defined by starting-sequence-number in the demo. This is the default cursor type.

This type of cursor is applicable to the scenario where the start SN has been specified.

It is closely related to sequenceNumber and sequenceNumberRange[A and B] of the partition data.

The specified SN must meet the following condition:

A<=sequenceNumber<=B

AFTER_SEQUENCE_NUMBER

Data is read from the position right after the position denoted by a specific SN. The SN is defined by starting-sequence-number.

This type of cursor is applicable to the scenario where the last consumption position is saved. For example, each consumption record is saved to a file or checkpoint. If the program is restarted, data will be restored from the position right after the save position. Comparatively, AT_SEQUENCE_NUMBER will restore from the save position, leading to a data duplicate.

It is closely related to sequenceNumber and sequenceNumberRange[A and B] of the partition data.

The specified SN must meet the following condition:

(A-1)<=sequenceNumber<=(B-1)

TRIM_HORIZON

Data is read from the earliest data record in the partition.

For example, if sequenceNumberRange [100, 200] is applied, data consumption starts from record 100.

This type of cursor is applicable to scenarios where the consumption position is unknown and all valid data in the partition will be consumed.

None.

LATEST

Data is read from the position just after the most recent record in the partition. That is, the system does not read the existing data in the partition but starts from the data to be uploaded.

For example, if sequenceNumberRange [100, 200] is applied, data consumption starts from record 200. If no data is uploaded, the obtained data is empty. If the data is uploaded, record 200, 201, 202... will be obtained.

This type of cursor is applicable to the scenario where the consumption position is unknown, so the existing data in the partition is discarded and consumption position starts from the newly uploaded data.

None.

AT_TIMESTAMP

Data is read from the position denoted by a specific timestamp. A 13-bit timestamp is required when the cursor is obtained.

For example, if 1541742263206 is specified, data uploaded from 2018-11-09 13:44:23 is read.

This type of cursor is applicable to the scenario where the consumption position is unknown, but the user wants to consume data from a specific time or from the end time of the last consumption.

  • If the upload time of the earliest data record is C, timestamp ≥ C.
  • If the timestamp is greater than the timestamp of the latest data or the future time, the system reads from the data right after the latest data.

Sample Code

Use the initialized client instance to obtain data through the DIS stream.

  • When the cursor type is set to AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, the sample code is as follows:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//Initialize the DIS SDK client instance.
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
//Configure the stream name.
String streamName = "streamName";    
// Configure the ID of the partition for data download.
String partitionId = "shardId-0000000000";    
//Configure the SN for data download.
String startingSequenceNumber = "0";    
//Configure the data download mode.
//AT_SEQUENCE_NUMBER: Data is read from the position denoted by a specific SN. The SN is defined by starting-sequence-number in the demo.
// AFTER_SEQUENCE_NUMBER: Data is read from the position right after the position denoted by a specific SN. The SN is defined by starting-sequence-number.
String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();
 
try 
{
//Obtain the data cursor.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      request.setStartingSequenceNumber(startingSequenceNumber);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
      GetRecordsRequest recordsRequest = new GetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
            {
                recordsRequest.setPartitionCursor(cursor);
                recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch of data cursor.
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                
            }
        }
    catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
  • When the cursor type is set to TRIM_HORIZON or LATEST, the sample code example is as follows:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//Initialize the DIS SDK client instance.
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
//Configure the stream name.
String streamName = "streamName";    
//Configure the partition ID.
String partitionId = "shardId-0000000000";    
//Configure the SN.
//String startingSequenceNumber = "0";    
//Configure the cursor type.
//TRIM_HORIZON: Data is read from the earliest data record in the partition.
//LATEST: Data is read just after the most recent record in the partition. This setting ensures that you always read the most recent data in the partition.
String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name();
 
try 
{
//Obtain the data cursor.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      //request.setStartingSequenceNumber(startingSequenceNumber);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
      GetRecordsRequest recordsRequest = new GetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
       {
                recordsRequest.setPartitionCursor(cursor);
                recordsRequest.setLimit(limit);
                recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch of data cursor.
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                if (recordResponse.getRecords().size() == 0)
                {
                    Thread.sleep(1000);
                }
            }
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
  • When the cursor type is set to AT_TIMESTAMP, the sample code example is as follows:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//Initialize the DIS SDK client instance. 
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
//Configure the stream name.
String streamName = "streamName";    
//Configure the partition ID.
String partitionId = "shardId-0000000000";    
//Configure the SN.
//String startingSequenceNumber = "0";  
//Configure the timestamp.
long timestamp = 1542960693804L;  
//Configure the cursor type.
//AT_TIMESTAMP: Data is read from the position denoted by a specific timestamp.
String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name();
 
try 
{
//Obtain the data cursor.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      //request.setStartingSequenceNumber(startingSequenceNumber);
      request.setTimestamp(timestamp);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

            GetRecordsRequest recordsRequest = new GetRecordsRequest();
            GetRecordsResult recordResponse = null;
         while (true)
            {
                recordsRequest.setPartitionCursor(cursor);
                recordsRequest.setLimit(limit);
                recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch of data cursor.
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                if (recordResponse.getRecords().size() == 0)
                {
                    Thread.sleep(1000);
                }
            }
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

Parameters

Table 2 Parameter description

Parameter

Type

Description

partitionId

String

Partition ID.

NOTE:

Define the return information fields on the console, such as partitionId [shardId-0000000000], based on the execution results obtained from Uploading Streaming Data.

startingSequenceNumber

String

Sequence number of an individual data record. Each data record has a sequence number that is unique within its partition. The sequence number is assigned by DIS when a data producer calls PutRecords to add data to a DIS stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between write requests (PutRecords requests), the larger the sequence numbers become.

NOTE:

Define the return information fields on the console, such as sequenceNumber [1], based on the execution results obtained from Uploading Streaming Data.

cursorType

String

Cursor type.

  • AT_SEQUENCE_NUMBER: Data is read from the position denoted by a specific SN. The SN is defined by starting-sequence-number in the demo. This is the default cursor type.
  • AFTER_SEQUENCE_NUMBER: Data is read from the position right after the position denoted by a specific sequence number. The SN is defined by starting-sequence-number in the demo.
  • TRIM_HORIZON: Data is read from the earliest data record in the partition.

    For example, a tenant uses a DIS stream to upload three pieces of data A1, A2, and A3. N days later, A1 has expired and A2 and A3 are still in the validity period. In this case, if the tenant sets the cursor type to TRIM_HORIZON, the system downloads data from A2.

  • LATEST: Data is read just after the most recent record in the partition. This setting ensures that you always read the most recent data in the partition.
  • AT_TIMESTAMP: Data is read from the position denoted by a specific timestamp.

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program is run successfully, you can view the information similar to the following on the console:

1
2
3
4
5
6
7
14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2]. 
Table 3 Parameter description

Parameter

Type

Description

partition_key

String

Partition key set when data is being uploaded.

NOTE:

If partition_key is specified when data is uploaded, partition_key is returned when data is downloaded. If partition_id instead of partition_key is specified when data is uploaded, no partition_key is returned.

startingSequenceNumber

String

Sequence number of an individual data record. Each data record has a sequence number that is unique within its partition. The sequence number is assigned by DIS when a data producer calls PutRecords to add data to a DIS stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between write requests (PutRecords requests), the larger the sequence numbers become.

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback