Updated on 2023-06-21 GMT+08:00

Downloading Streaming Data

Context

To download streaming data, you need to determine the position where the data is obtained from the partition, that is, to obtain the cursor. After the start position is determined, the data is obtained cyclically.

There are five cursor types available:

  • AT_SEQUENCE_NUMBER
  • AFTER_SEQUENCE_NUMBER
  • TRIM_HORIZON
  • LATEST
  • AT_TIMESTAMP

To better understand the cursor type, you need to understand the following basic concepts:

  • A sequence number (SN) is the unique identifier of each record. DIS automatically allocates an SN when a data producer calls the PutRecords operation to add data to the DIS stream. SN of the same partition key usually changes with time. A longer interval between PutRecords requests results in a larger sequence number.
  • SN of each partition increases from 0. Each data record corresponds to a unique SN. As a lifecycle ends, the SN expires. For example, after a data record is uploaded to a new partition and its SN starts from 0. After 100 data records are uploaded, the SN of the last data record is 99. When the lifecycle ends, SNs 0 to 99 become unavailable.
  • The SN range of a partition can be obtained by calling the describeStream API for querying stream details. sequenceNumberRange indicates the data validity range. The first value is the SN of the earliest data, the last value is the SN of the next uploaded data, and the SN of the latest data is one less than the last value.

    For example, [100, 200] indicates that a total of 200 data records have been uploaded to the partition, data records 0 to 99 have expired, the earliest valid data record is 100, the latest data record is 199, and the SN of the next data record to be uploaded is 200.

Scenario Description

The following table describes the application scenarios of the five cursor types:

Table 1 Scenario description

Cursor Type

Description

Application Scenario

Remarks

AT_SEQUENCE_NUMBER

Data is read from the position denoted by a specific SN. The SN is defined by starting-sequence-number in the demo. This is the default cursor type.

This type of cursor is applicable to the scenario where the start SN has been specified.

It is closely related to sequenceNumber and sequenceNumberRange[A and B] of the partition data.

The specified SN must meet the following condition:

A<=sequenceNumber<=B

AFTER_SEQUENCE_NUMBER

Data is read from the position right after the position denoted by a specific SN. The SN is defined by starting-sequence-number.

This type of cursor is applicable to the scenario where the last consumption position is saved. For example, each consumption record is saved to a file or checkpoint. If the program is restarted, data will be restored from the position right after the save position. Comparatively, AT_SEQUENCE_NUMBER will restore from the save position, leading to a data duplicate.

It is closely related to sequenceNumber and sequenceNumberRange[A and B] of the partition data.

The specified SN must meet the following condition:

(A-1)<=sequenceNumber<=(B-1)

TRIM_HORIZON

Data is read from the earliest data record in the partition.

For example, if sequenceNumberRange [100, 200] is applied, data consumption starts from record 100.

This type of cursor is applicable to scenarios where the consumption position is unknown and all valid data in the partition will be consumed.

None.

LATEST

Data is read from the position just after the most recent record in the partition. That is, the system does not read the existing data in the partition but starts from the data to be uploaded.

For example, if sequenceNumberRange [100, 200] is applied, data consumption starts from record 200. If no data is uploaded, the obtained data is empty. If the data is uploaded, record 200, 201, 202... will be obtained.

This type of cursor is applicable to the scenario where the consumption position is unknown, so the existing data in the partition is discarded and consumption position starts from the newly uploaded data.

None.

AT_TIMESTAMP

Data is read from the position denoted by a specific timestamp. A 13-bit timestamp is required when the cursor is obtained.

For example, if 1541742263206 is specified, data uploaded from 2018-11-09 13:44:23 is read.

This type of cursor is applicable to the scenario where the consumption position is unknown, but the user wants to consume data from a specific time or from the end time of the last consumption.

  • If the upload time of the earliest data record is C, timestamp ≥ C.
  • If the timestamp is greater than the timestamp of the latest data or the future time, the system reads from the data right after the latest data.

Sample Code

Use the initialized client instance to obtain data through the DIS stream.

  • When the cursor type is set to AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, the sample code is as follows:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//Initialize the DIS SDK client instance.
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
//Configure the stream name.
String streamName = "streamName";    
// Configure the ID of the partition for data download.
String partitionId = "shardId-0000000000";    
//Configure the SN for data download.
String startingSequenceNumber = "0";    
//Configure the data download mode.
//AT_SEQUENCE_NUMBER: Data is read from the position denoted by a specific SN. The SN is defined by starting-sequence-number in the demo.
// AFTER_SEQUENCE_NUMBER: Data is read from the position right after the position denoted by a specific SN. The SN is defined by starting-sequence-number.
String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();
 
try 
{
//Obtain the data cursor.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      request.setStartingSequenceNumber(startingSequenceNumber);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
      GetRecordsRequest recordsRequest = new GetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
            {
                recordsRequest.setPartitionCursor(cursor);
                recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch of data cursor.
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                
            }
        }
    catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
  • When the cursor type is set to TRIM_HORIZON or LATEST, the sample code example is as follows:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//Initialize the DIS SDK client instance.
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
//Configure the stream name.
String streamName = "streamName";    
//Configure the partition ID.
String partitionId = "shardId-0000000000";    
//Configure the SN.
//String startingSequenceNumber = "0";    
//Configure the cursor type.
//TRIM_HORIZON: Data is read from the earliest data record in the partition.
//LATEST: Data is read just after the most recent record in the partition. This setting ensures that you always read the most recent data in the partition.
String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name();
 
try 
{
//Obtain the data cursor.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      //request.setStartingSequenceNumber(startingSequenceNumber);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
      GetRecordsRequest recordsRequest = new GetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
       {
                recordsRequest.setPartitionCursor(cursor);
                recordsRequest.setLimit(limit);
                recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch of data cursor.
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                if (recordResponse.getRecords().size() == 0)
                {
                    Thread.sleep(1000);
                }
            }
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
  • When the cursor type is set to AT_TIMESTAMP, the sample code example is as follows:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//Initialize the DIS SDK client instance. 
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk("xxxx")
            .withSk("xxxx")
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
//Configure the stream name.
String streamName = "streamName";    
//Configure the partition ID.
String partitionId = "shardId-0000000000";    
//Configure the SN.
//String startingSequenceNumber = "0";  
//Configure the timestamp.
long timestamp = 1542960693804L;  
//Configure the cursor type.
//AT_TIMESTAMP: Data is read from the position denoted by a specific timestamp.
String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name();
 
try 
{
//Obtain the data cursor.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      //request.setStartingSequenceNumber(startingSequenceNumber);
      request.setTimestamp(timestamp);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

            GetRecordsRequest recordsRequest = new GetRecordsRequest();
            GetRecordsResult recordResponse = null;
         while (true)
            {
                recordsRequest.setPartitionCursor(cursor);
                recordsRequest.setLimit(limit);
                recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch of data cursor.
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                if (recordResponse.getRecords().size() == 0)
                {
                    Thread.sleep(1000);
                }
            }
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

Parameters

Table 2 Parameter description

Parameter

Type

Description

partitionId

String

Partition ID.

NOTE:

Define the return information fields on the console, such as partitionId [shardId-0000000000], based on the execution results obtained from Uploading Streaming Data.

startingSequenceNumber

String

Sequence number of an individual data record. Each data record has a sequence number that is unique within its partition. The sequence number is assigned by DIS when a data producer calls PutRecords to add data to a DIS stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between write requests (PutRecords requests), the larger the sequence numbers become.

NOTE:

Define the return information fields on the console, such as sequenceNumber [1], based on the execution results obtained from Uploading Streaming Data.

cursorType

String

Cursor type.

  • AT_SEQUENCE_NUMBER: Data is read from the position denoted by a specific SN. The SN is defined by starting-sequence-number in the demo. This is the default cursor type.
  • AFTER_SEQUENCE_NUMBER: Data is read from the position right after the position denoted by a specific sequence number. The SN is defined by starting-sequence-number in the demo.
  • TRIM_HORIZON: Data is read from the earliest data record in the partition.

    For example, a tenant uses a DIS stream to upload three pieces of data A1, A2, and A3. N days later, A1 has expired and A2 and A3 are still in the validity period. In this case, if the tenant sets the cursor type to TRIM_HORIZON, the system downloads data from A2.

  • LATEST: Data is read just after the most recent record in the partition. This setting ensures that you always read the most recent data in the partition.
  • AT_TIMESTAMP: Data is read from the position denoted by a specific timestamp.

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program is run successfully, you can view the information similar to the following on the console:

1
2
3
4
5
6
7
14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2]. 
Table 3 Parameter description

Parameter

Type

Description

partition_key

String

Partition key set when data is being uploaded.

NOTE:

If partition_key is specified when data is uploaded, partition_key is returned when data is downloaded. If partition_id instead of partition_key is specified when data is uploaded, no partition_key is returned.

startingSequenceNumber

String

Sequence number of an individual data record. Each data record has a sequence number that is unique within its partition. The sequence number is assigned by DIS when a data producer calls PutRecords to add data to a DIS stream. Sequence numbers for the same partition key generally increase over time; the longer the time period between write requests (PutRecords requests), the larger the sequence numbers become.