Help Center/ Data Ingestion Service/ Best Practices/ Using DIS to Analyze Vehicle Locations in Real Time
Updated on 2023-06-21 GMT+08:00

Using DIS to Analyze Vehicle Locations in Real Time

Scenario Overview

Data Ingestion Service (DIS) collects vehicle location data in real time and uploads the data to CloudTable Service (CloudTable). You can use CloudTable to query locations of a vehicle in a specified period.

Figure 1 Service process diagram

The procedure is as follows:

  1. Creating a CloudTable Cluster
  2. Creating a Data Table on CloudTable
  3. Creating a DIS Stream
  4. Creating a Dump Task
  5. Obtaining Authentication Information
  6. Preparing a DIS Application Development Environment
  7. Developing an Application for Sending Data to DIS
  8. Starting the Data Upload Application
  9. Viewing the Uploaded Data on CloudTable
  10. Querying Locations of a Specified Vehicle on CloudTable

Creating a CloudTable Cluster

Create a CloudTable cluster for storing the data dumped from DIS.

Creating a Data Table on CloudTable

Create a CloudTable data table after creating a CloudTable cluster.

The collected data is in JSON format. An example is as follows:

{"DeviceID":"4d3a27c13dc21ae056044b818a03dwen002","Mileage":"55378500","DataTime":"2017-10-23 12:19:35.000","Latitude":"34.585639","IsACCOpen":"true","Longitude":"119.193524","Velocity":0,"Direction":"null","Carnum":"WL66666","BaiDuLatitude":"34.607069","BaiDuLongitude":"119.190093","BaiDuAdress":"No. 78 Tongguan North Road, Xinpu District, Lianyungang City, Jiangsu Province of China","ReceiveTime":"2017-10-23 12:19:34.846","Altitude":"null"}

In this practice, you can create data tables using the HBase shell client.

  1. Prepare a Linux Elastic Cloud Server (ECS).
  2. Install and start HBase shell to access the CloudTable cluster.
  3. On the HBase shell client, run the create 'tbl1',{NAME => 'i'} command to create a data table. After the data table is successfully created, the information similar to the following is displayed:

Creating a DIS Stream

Create a stream. For details, see Creating a DIS Stream.

Creating a Dump Task

  1. Log in to the DIS console.
  2. In the navigation tree, choose Stream Management.
  3. Click the stream created in Creating a DIS Stream. On the displayed page, click the Dump Management tab.
  4. Click Create Dump Task. On the Create Dump Task page, configure dump parameters.

    • A maximum of five dump tasks can be created for each stream.
    • A dump task cannot be added to a stream whose Source Data Type is FILE.

  5. Click Create Now.

    Table 1 Dump task parameters

    Parameter

    Description

    Value

    Dump Destination

    CloudTable: The streaming data is stored to DIS while being imported to the HBase table and OpenTSDB of the CloudTable cluster in real time.

    CloudTable

    Task Name

    Name of the dump task. The task name must be unique in a stream. A task name is 1 to 64 characters long. Only letters, digits, hyphens (-), and underscores (_) are allowed.

    -

    Offset

    • Latest: Maximum offset, indicating that the latest data will be extracted.
    • Earliest: Minimum offset, indicating that the earliest data will be extracted.

    Latest

    CloudTable Cluster

    Click Select. In the Select CloudTable Cluster dialog box, select a CloudTable cluster.

    This parameter cannot be left blank. You can only select but not enter a value in this field.

    cloudtable-demo

    Table Type

    Two CloudTable types are available: HBase and OpenTSDB.

    HBase

    Table

    Click Select. In the Select CloudTable Table dialog box, select a CloudTable table.

    You can only select but not enter a value in this field.

    NOTE:

    This parameter is available only after you select a CloudTable cluster and create an HBase table.

    tbl1

    Backup

    Specifies whether to back up the data that fails to be dumped to CloudTable to OBS.

    • Enable: The data will be backed up to OBS.
    • Disable: The data will not be backed up to OBS.

    Backup is disabled by default.

    NOTE:

    If this function is disabled, data that fails the dump will be stored in DIS and be cleared when Data Retention is expired.

    Disabled

    Row Key

    • JSON attribute name. The value contains a maximum of 32 characters and consists only of letters, digits, underscores (_), and periods (.). The value cannot start or end with a period or contain consecutive periods and cannot be left blank. A maximum of 64 attributes can be added.
    • Data type. Possible values:
      • Bigint
      • Double
      • Boolean
      • Timestamp
      • String
      • Decimal

    -

    Row Key Delimiter

    Delimiter used to separate row keys. The value can only be a period (.), comma (,), vertical bar (|), semicolon (;), hyphen (-), underscore (_), or tilde (~). It can also be set to NULL.

    The maximum length is one character.

    -

    Schema Column

    • Column name. The value contains a maximum of 32 characters and consists only of letters, digits, and underscores (_). The value cannot be left blank. A maximum of 4,096 columns can be added.
    • Data type. Possible values:
      • Bigint
      • Double
      • Boolean
      • Timestamp
      • String
      • Decimal
    • JSON attribute name. The value contains a maximum of 32 characters and consists only of letters, digits, underscores (_), and periods (.). The value cannot start or end with a period or contain consecutive periods and cannot be left blank.
    • Column family. The value can only be selected from the drop-down list box and cannot be left blank. This parameter is available only after you select a CloudTable cluster and a CloudTable table, and set Type of a CloudTable table to HBase.

    See Table 2.

    Table 2 Schema column parameters

    Column Name

    Data Type

    JSON Attribute Name

    Column Family

    DeviceID

    String

    DeviceID

    i

    Mileage

    Bigint

    Mileage

    i

    Latitude

    Decimal

    Latitude

    i

    IsACCOpen

    Boolean

    IsACCOpen

    i

    Longitude

    Decimal

    Longitude

    i

    Velocity

    Bigint

    Velocity

    i

    Direction

    String

    Direction

    i

    BaiDuLatitude

    Decimal

    BaiDuLatitude

    i

    BaiDuLongitude

    Decimal

    BaiDuLongitude

    i

    BaiDuAdress

    String

    BaiDuAdress

    i

    ReceiveTime

    Timestamp

    ReceiveTime

    i

    Altitude

    String

    Altitude

    i

Obtaining Authentication Information

  • Obtaining an Access Key ID/Secret Access Key (AK/SK)
    To obtain an access key, perform the following steps:
    1. Log in to the management console, move the cursor to the username in the upper right corner, and select My Credentials from the drop-down list.
    2. On the My Credentials page, choose Access Keys, and click Create Access Key. See Figure 2.
      Figure 2 Clicking Create Access Key
    3. Click OK and save the access key file as prompted. The access key file will be saved to your browser's configured download location. Open the credentials.csv file to view Access Key Id and Secret Access Key.
      • Only two access keys can be added for each user.
      • To ensure access key security, the access key is automatically downloaded only when it is generated for the first time and cannot be obtained from the management console later. Keep them properly.
  • Obtaining a project ID and account ID
    You can obtain the project ID and account ID by performing the following steps:
    1. Register with and log in to the management console.
    2. Hover the cursor on the username in the upper right corner and select My Credentials from the drop-down list.
    3. On the API Credentials page, obtain the account name and account ID, and obtain the project ID from the project list.
  • Obtaining the endpoint

    An endpoint is the request address for calling an API. Endpoints vary depending on services and regions. You can obtain endpoints of the service from Endpoints.

Preparing a DIS Application Development Environment

For details, see Preparing a DIS Application Development Environment.

Developing an Application for Sending Data to DIS

  1. Prepare a data sample.
  2. Modify the sample code.

    The sample project is the ProducerDemo.java file in the \dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo directory of the huaweicloud-sdk-dis-java-.zip package downloaded in the Preparing a DIS Application Development Environment.

    Change the values of AK, SK, and ProjectId based on the site requirements.

     private static void runProduceDemo()
        {
    //Create a DIS client instance.
            DIS dic = DISClientBuilder.standard()
                    .withEndpoint("https://dis.cn-north-1.myhuaweicloud.com:20004")
                    .withAk("${your_AK}")
                    .withSk("${your_SK}")
                    .withProjectId("${your_projectId}")
                    .withRegion("cn-north-1")
                    .build();
            
    //Configure the stream name.
            String streamName = "dis-demo";
            
    //Configure the data to be uploaded.
            PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
            putRecordsRequest.setStreamName(streamName);
            
            List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
    String[] messages = {     prepared data sample    };
            
            for (int i = 0; i < messages.length; i++)
            {
               ByteBuffer buffer = ByteBuffer.wrap(messages[i].getBytes());
                PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
                putRecordsRequestEntry.setData(buffer);
                putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
                putRecordsRequestEntryList.add(putRecordsRequestEntry);
            }
            putRecordsRequest.setRecords(putRecordsRequestEntryList);
            
            log.info("========== BEGIN PUT ============");
            
            PutRecordsResult putRecordsResult = null;
            try
            {
                putRecordsResult = dic.putRecords(putRecordsRequest);
            }
            catch (DISClientException e)
            {
                log.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                    e.getMessage(),
                    e);
            }
            catch (ResourceAccessException e)
            {
                log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e);
            }
            catch (Exception e)
            {
                log.error(e.getMessage(), e);
            }
            
            if (putRecordsResult != null)
            {
                log.info("Put {} records[{} successful / {} failed].",
                    putRecordsResult.getRecords().size(),
                    putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(),
                    putRecordsResult.getFailedRecordCount());
                
                for (int j = 0; j < putRecordsResult.getRecords().size(); j++)
                {
                    PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j);
                    if (putRecordsRequestEntry.getErrorCode() != null)
                    {
                     //Failed to upload the data.
                        log.error("[{}] put failed, errorCode [{}], errorMessage [{}]",
                            new String(putRecordsRequestEntryList.get(j).getData().array()),
                            putRecordsRequestEntry.getErrorCode(),
                            putRecordsRequestEntry.getErrorMessage());
                    }
                    else
                    {
                        // Data is uploaded successfully.
                        log.info("[{}] put success, partitionId [{}], partitionKey [{}], sequenceNumber [{}]",
                            new String(putRecordsRequestEntryList.get(j).getData().array()),
                            putRecordsRequestEntry.getPartitionId(),
                            putRecordsRequestEntryList.get(j).getPartitionKey(),
                            putRecordsRequestEntry.getSequenceNumber());
                    }
                }
            }
            log.info("========== END PUT ============");
        }

Starting the Data Upload Application

Right-click the producer application and choose Run As > 1 Java Application from the shortcut menu.
Figure 3 Running the application

While data is being sent to DIS, the DIS console displays DIS stream information. If information similar to the following is displayed, the data has been successfully sent to DIS:

Viewing the Uploaded Data on CloudTable

Run the scan 'tbl1' command on the HBase shell client. If information similar to the following is displayed, the data has been successfully uploaded to CloudTable:

Querying Locations of a Specified Vehicle on CloudTable

The following gives an example about how to query the locations of vehicle WL66666 after 2017-10-23 12:22:00.

  1. Log in to the HBase shell client.
  2. Run the scan 'tbl1',{COLUMNS => ['i:Latitude','i:Longitude'], FILTER=>"RowFilter(>=,'binary:WL66666|2017-10-23 12:22:00')"} command to query vehicle locations.

    The following query result is displayed on the HBase client: