文档首页/ 数据接入服务 DIS/ 最佳实践/ 使用DIS实时分析车辆位置
更新时间:2023-06-21 GMT+08:00

使用DIS实时分析车辆位置

场景介绍

数据接入服务(Data Ingestion Service,简称DIS)实时采集车辆位置数据并上传到华为云的表格存储服务(CloudTable Service,简称CloudTable)中,用户可以使用CloudTable查询指定车辆在指定时间段的车辆位置。

图1 业务流程图

本次实践基本流程如下所示:

  1. 申请CloudTable集群
  2. 在CloudTable中创建数据表
  3. 申请DIS通道
  4. 添加转储任务
  5. 获取认证信息
  6. 准备DIS应用开发环境
  7. 编写发送数据到DIS的应用程序
  8. 启动数据上传程序
  9. 在CloudTable中查看上传数据
  10. CloudTable查询指定车辆位置

申请CloudTable集群

创建一个CloudTable集群用于存放DIS转储的数据。

在CloudTable中创建数据表

用户创建DIS通道,选择将数据转储到CloudTable中,需要创建CloudTable数据表。

采集获得数据是JSON格式,样例如下:

{"DeviceID":"4d3a27c13dc21ae056044b818a03dwen002","Mileage":"55378500","DataTime":"2017-10-23 12:19:35.000","Latitude":"34.585639","IsACCOpen":"true","Longitude":"119.193524","Velocity":0,"Direction":"null","Carnum":"WL66666","BaiDuLatitude":"34.607069","BaiDuLongitude":"119.190093","BaiDuAdress":"江苏省连云港市新浦区通灌北路78号","ReceiveTime":"2017-10-23 12:19:34.846","Altitude":"null"}

本实践中,通过使用HBase shell客户端完成建表操作。

  1. 准备Linux弹性云服务器。假设该弹性云服务器名称为“ecs-385d”。
  2. 安装客户端并启动Shell访问CloudTable集群。
  3. 在HBase shell客户端执行create 'tbl1',{NAME => 'i'}命令,创建数据表。界面显示如下表示创建成功。

申请DIS通道

请参见开通DIS通道创建通道。

添加转储任务

  1. 使用注册帐户登录DIS控制台。
  2. 在左侧列表栏中选择“通道管理”。
  3. 单击申请DIS通道中创建的通道名称,进入所选通道的管理页面,选择“转储管理”页签。
  4. 单击“添加转储任务”按钮,在弹出的“添加转储任务”页面配置转储相关配置项。

    • 每个通道最多可创建5个转储任务。
    • 源数据类型为FILE的通道,不允许添加转储任务。

  5. 单击“立即创建”

    表1 转储任务参数说明

    参数

    参数解释

    配置值

    转储服务类型

    选择CloudTable,通道里的流式数据存储在DIS中,并实时导入表格存储服务Cloudtable集群的HBase表和OpenTSDB。

    CloudTable

    任务名称

    用户创建转储任务时,需要指定转储任务名称,同一通道的转储任务名称不可重复。任务名称由英文字母、数字、中划线和下划线组成。长度为1~64个字符。

    -

    偏移量

    • 最新:最大偏移量,即获取最新的数据。
    • 最早:最小偏移量,即读取最早的数据。

    最新

    CloudTable集群

    单击“选择”,在“选择CloudTable集群”窗口选择一个集群名称。

    此配置项不可配置为空。仅支持选择,不可手动输入。

    cloudtable-demo

    CloudTable表类型

    HBase和openTSDB两种。

    HBase

    CloudTable数据表

    CloudTable数据表:单击“选择”,在“选择CloudTable数据表”窗口选择一个数据表。

    此处路径仅支持选择,不可手动输入。

    说明:

    配置此项必须已配置“CloudTable集群”并创建了HBase表。

    tbl1

    备份开关

    用户数据转储CloudTable服务失败时,是否将转储失败的数据备份至OBS服务。

    • 开启:是,转储失败的数据备份至OBS服务。
    • 关闭:否,转储失败的数据不备份至OBS服务。

    开关默认关闭。

    说明:

    关闭开关,转储失败的数据会存储在DIS中,并在“生命周期”配置的时间到达时将数据清除。

    关闭

    Row Key

    • Json属性名,取值范围为英文字母、数字、下划线和小数点,最大取值为32个字符,不可为空,不可以小数点开头,不可包含连续的小数点 且不可以小数点结尾。最多可添加64个属性。
    • 数据类型,从下拉框选择。
      • Bigint
      • Double
      • Boolean
      • Timestamp
      • String
      • Decimal

    -

    Row Key 分隔符

    支持“.”“,”“|”“;”“-”“_”、和“~”七种字符取值,也可配置为NULL。

    最大长度为一个字符。

    -

    Schema 列

    • 列名,取值范围为英文字母、数字和下划线,最大取值为32个字符,不可为空。最多可添加4096个列。
    • 数据类型,从下拉框选择。
      • Bigint
      • Double
      • Boolean
      • Timestamp
      • String
      • Decimal
    • Json属性名,取值范围为英文字母、数字、下划线和小数点,最大取值为32个字符,不可为空,不可以小数点开头,不可包含连续的小数点 且不可以小数点结尾。
    • 所属列族,从下拉框选择,不可为空。配置此项必须已配置“CloudTable 集群”、“CloudTable 数据表”且CloudTable表类型为HBase。

    参见表2表2Schema 列填写。

    表2 Schema 列填写

    列名

    数据类型

    JSON属性名

    列族

    DeviceID

    String

    DeviceID

    i

    Mileage

    Bigint

    Mileage

    i

    Latitude

    Decimal

    Latitude

    i

    IsACCOpen

    Boolean

    IsACCOpen

    i

    Longitude

    Decimal

    Longitude

    i

    Velocity

    Bigint

    Velocity

    i

    Direction

    String

    Direction

    i

    BaiDuLatitude

    Decimal

    BaiDuLatitude

    i

    BaiDuLongitude

    Decimal

    BaiDuLongitude

    i

    BaiDuAdress

    String

    BaiDuAdress

    i

    ReceiveTime

    Timestamp

    ReceiveTime

    i

    Altitude

    String

    Altitude

    i

获取认证信息

  • 获取AK/SK
    您可以通过如下方式获取访问密钥。
    1. 登录控制台,在用户名下拉列表中选择“我的凭证”。
    2. 进入“我的证”页面,选择访问密钥 > 新增访问密钥,如图2所示。
      图2 单击新增访问密钥
    3. 单击“确定”,根据浏览器提示,保存密钥文件。密钥文件会直接保存到浏览器默认的下载文件夹中。打开名称为“credentials.csv”的文件,即可查看访问密钥(Access Key Id和Secret Access Key)。
      • 每个用户仅允许新增两个访问密钥。
      • 为保证访问密钥的安全,访问密钥仅在初次生成时自动下载,后续不可再次通过管理控制台界面获取。请在生成后妥善保管。
  • 获取项目ID和帐号ID
    项目ID表示租户的资源,帐号ID对应当前帐号。用户可在对应页面下查看不同Region对应的项目ID和帐号ID。
    1. 注册并登录管理控制台。
    2. 在用户名的下拉列表中单击“我的凭证”
    3. “API凭证”页面,查看帐号名和帐号ID,在项目列表中查看项目ID。
  • 获取endpoint

    终端节点(Endpoint)即调用API的请求地址,不同服务不同区域的终端节点不同。本服务的Endpoint可从终端节点Endpoint获取。

准备DIS应用开发环境

具体操作请参见准备DIS应用开发环境

编写发送数据到DIS的应用程序

  1. 准备数据样例。
  2. 修改样例代码。

    样例工程为准备DIS应用开发环境中下载的“huaweicloud-sdk-dis-java-.zip ”压缩包“\dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo”路径下的“ProducerDemo.java”文件。

    根据实际情况更改“AK”“SK”“ProjectId”的值。

     private static void runProduceDemo()
        {
            // 创建DIS客户端实例
            DIS dic = DISClientBuilder.standard()
                    .withEndpoint("https://dis.cn-north-1.myhuaweicloud.com:20004")
                    .withAk("${your_AK}")
                    .withSk("${your_SK}")
                    .withProjectId("${your_projectId}")
                    .withRegion("cn-north-1")
                    .build();
            
            // 配置流名称
            String streamName = "dis-demo";
            
            // 配置上传的数据
            PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
            putRecordsRequest.setStreamName(streamName);
            
            List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
             String[] messages = {     此处填写上一步准备的数据样例    };
            
            for (int i = 0; i < messages.length; i++)
            {
               ByteBuffer buffer = ByteBuffer.wrap(messages[i].getBytes());
                PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
                putRecordsRequestEntry.setData(buffer);
                putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
                putRecordsRequestEntryList.add(putRecordsRequestEntry);
            }
            putRecordsRequest.setRecords(putRecordsRequestEntryList);
            
            log.info("========== BEGIN PUT ============");
            
            PutRecordsResult putRecordsResult = null;
            try
            {
                putRecordsResult = dic.putRecords(putRecordsRequest);
            }
            catch (DISClientException e)
            {
                log.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                    e.getMessage(),
                    e);
            }
            catch (ResourceAccessException e)
            {
                log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e);
            }
            catch (Exception e)
            {
                log.error(e.getMessage(), e);
            }
            
            if (putRecordsResult != null)
            {
                log.info("Put {} records[{} successful / {} failed].",
                    putRecordsResult.getRecords().size(),
                    putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(),
                    putRecordsResult.getFailedRecordCount());
                
                for (int j = 0; j < putRecordsResult.getRecords().size(); j++)
                {
                    PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j);
                    if (putRecordsRequestEntry.getErrorCode() != null)
                    {
                        // 上传失败
                        log.error("[{}] put failed, errorCode [{}], errorMessage [{}]",
                            new String(putRecordsRequestEntryList.get(j).getData().array()),
                            putRecordsRequestEntry.getErrorCode(),
                            putRecordsRequestEntry.getErrorMessage());
                    }
                    else
                    {
                        // 上传成功
                        log.info("[{}] put success, partitionId [{}], partitionKey [{}], sequenceNumber [{}]",
                            new String(putRecordsRequestEntryList.get(j).getData().array()),
                            putRecordsRequestEntry.getPartitionId(),
                            putRecordsRequestEntryList.get(j).getPartitionKey(),
                            putRecordsRequestEntry.getSequenceNumber());
                    }
                }
            }
            log.info("========== END PUT ============");
        }

启动数据上传程序

程序开发完成后,右键选择“Run As > 1 Java Application”运行程序,如图3所示。
图3 运行上传数据程序

数据上传过程中可在Console控制台查看数据上传通道量信息。出现类似信息表示数据上传成功。

在CloudTable中查看上传数据

在HBase shell客户端执行scan 'tbl1'命令,显示如下表示数据上传成功。

CloudTable查询指定车辆位置

以查询车辆“WL66666”在2017-10-23 12:22:00时间以后的位置为例。

  1. 登录HBase shell客户端。
  2. 执行scan 'tbl1',{COLUMNS => ['i:Latitude','i:Longitude'], FILTER=>"RowFilter(>=,'binary:WL66666|2017-10-23 12:22:00')"}命令,查询车辆位置。

    HBase客户端查询结果如下所示。