使用DIS实时分析车辆位置
场景介绍
数据接入服务(Data Ingestion Service,简称DIS)实时采集车辆位置数据并上传到华为云的表格存储服务(CloudTable Service,简称CloudTable)中,用户可以使用CloudTable查询指定车辆在指定时间段的车辆位置。
本次实践基本流程如下所示:
在CloudTable中创建数据表
用户创建DIS通道,选择将数据转储到CloudTable中,需要创建CloudTable数据表。
采集获得数据是JSON格式,样例如下:
{"DeviceID":"4d3a27c13dc21ae056044b818a03dwen002","Mileage":"55378500","DataTime":"2017-10-23 12:19:35.000","Latitude":"34.585639","IsACCOpen":"true","Longitude":"119.193524","Velocity":0,"Direction":"null","Carnum":"WL66666","BaiDuLatitude":"34.607069","BaiDuLongitude":"119.190093","BaiDuAdress":"江苏省连云港市新浦区通灌北路78号","ReceiveTime":"2017-10-23 12:19:34.846","Altitude":"null"}
本实践中,通过使用HBase shell客户端完成建表操作。
- 准备Linux弹性云服务器。假设该弹性云服务器名称为“ecs-385d”,具体操作请参见准备弹性云服务器。
- 安装客户端并启动Shell访问CloudTable集群,具体操作请参见使用HBaseShell访问集群。
- 在HBase shell客户端执行create 'tbl1',{NAME => 'i'}命令,创建数据表。界面显示如下表示创建成功。
添加转储任务
- 使用注册账户登录DIS控制台。
- 在左侧列表栏中选择“通道管理”。
- 单击申请DIS通道中创建的通道名称,进入所选通道的管理页面,选择“转储管理”页签。
- 单击“添加转储任务”按钮,在弹出的“添加转储任务”页面配置转储相关配置项。
- 每个通道最多可创建5个转储任务。
- 源数据类型为FILE的通道,不允许添加转储任务。
- 单击“立即创建”。
表1 转储任务参数说明 参数
参数解释
配置值
转储服务类型
选择CloudTable,通道里的流式数据存储在DIS中,并实时导入表格存储服务Cloudtable集群的HBase表和OpenTSDB。
CloudTable
任务名称
用户创建转储任务时,需要指定转储任务名称,同一通道的转储任务名称不可重复。任务名称由英文字母、数字、中划线和下划线组成。长度为1~64个字符。
-
偏移量
- 最新:最大偏移量,即获取最新的数据。
- 最早:最小偏移量,即读取最早的数据。
最新
CloudTable集群
单击“选择”,在“选择CloudTable集群”窗口选择一个集群名称。
此配置项不可配置为空。仅支持选择,不可手动输入。
cloudtable-demo
CloudTable表类型
HBase和openTSDB两种。
HBase
CloudTable数据表
CloudTable数据表:单击“选择”,在“选择CloudTable数据表”窗口选择一个数据表。
此处路径仅支持选择,不可手动输入。
说明:配置此项必须已配置“CloudTable集群”并创建了HBase表。
tbl1
备份开关
用户数据转储CloudTable服务失败时,是否将转储失败的数据备份至OBS服务。
- 开启:是,转储失败的数据备份至OBS服务。
- 关闭:否,转储失败的数据不备份至OBS服务。
开关默认关闭。
说明:关闭开关,转储失败的数据会存储在DIS中,并在“生命周期”配置的时间到达时将数据清除。
关闭
Row Key
- Json属性名,取值范围为英文字母、数字、下划线和小数点,最大取值为32个字符,不可为空,不可以小数点开头,不可包含连续的小数点 且不可以小数点结尾。最多可添加64个属性。
- 数据类型,从下拉框选择。
- Bigint
- Double
- Boolean
- Timestamp
- String
- Decimal
-
Row Key 分隔符
支持“.”、“,”、“|”、“;”、“-”、“_”、和“~”七种字符取值,也可配置为NULL。
最大长度为一个字符。
-
Schema 列
- 列名,取值范围为英文字母、数字和下划线,最大取值为32个字符,不可为空。最多可添加4096个列。
- 数据类型,从下拉框选择。
- Bigint
- Double
- Boolean
- Timestamp
- String
- Decimal
- Json属性名,取值范围为英文字母、数字、下划线和小数点,最大取值为32个字符,不可为空,不可以小数点开头,不可包含连续的小数点 且不可以小数点结尾。
- 所属列族,从下拉框选择,不可为空。配置此项必须已配置“CloudTable 集群”、“CloudTable 数据表”且CloudTable表类型为HBase。
参见表2表2Schema 列填写。
表2 Schema 列填写 列名
数据类型
JSON属性名
列族
DeviceID
String
DeviceID
i
Mileage
Bigint
Mileage
i
Latitude
Decimal
Latitude
i
IsACCOpen
Boolean
IsACCOpen
i
Longitude
Decimal
Longitude
i
Velocity
Bigint
Velocity
i
Direction
String
Direction
i
BaiDuLatitude
Decimal
BaiDuLatitude
i
BaiDuLongitude
Decimal
BaiDuLongitude
i
BaiDuAdress
String
BaiDuAdress
i
ReceiveTime
Timestamp
ReceiveTime
i
Altitude
String
Altitude
i
获取认证信息
- 获取AK/SK
您可以通过如下方式获取访问密钥。
- 登录控制台,在用户名下拉列表中选择“我的凭证”。
- 进入“我的凭证”页面,选择,如图2所示。
- 单击“确定”,根据浏览器提示,保存密钥文件。密钥文件会直接保存到浏览器默认的下载文件夹中。打开名称为“credentials.csv”的文件,即可查看访问密钥(Access Key Id和Secret Access Key)。
- 每个用户仅允许新增两个访问密钥。
- 为保证访问密钥的安全,访问密钥仅在初次生成时自动下载,后续不可再次通过管理控制台界面获取。请在生成后妥善保管。
- 获取项目ID和账号ID
项目ID表示租户的资源,账号ID对应当前账号,IAM用户ID对应当前用户。用户可在对应页面下查看不同Region对应的项目ID、账号ID和用户ID。
- 注册并登录管理控制台。
- 在用户名的下拉列表中单击“我的凭证”。
- 在“API凭证”页面,查看账号名和账号ID、IAM用户名和IAM用户ID,在项目列表中查看项目ID。
- 获取endpoint
终端节点(Endpoint)即调用API的请求地址,不同服务不同区域的终端节点不同。本服务的Endpoint可从终端节点Endpoint获取。
编写发送数据到DIS的应用程序
- 准备数据样例。
- 修改样例代码。
样例工程为准备DIS应用开发环境中下载的“huaweicloud-sdk-dis-java-X.X.X.zip ”压缩包“\dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo”路径下的“ProducerDemo.java”文件。
根据实际情况更改“AK”、“SK”和“ProjectId”的值。
private static void runProduceDemo() { // 创建DIS客户端实例 DIS dic = DISClientBuilder.standard() .withEndpoint("https://dis.cn-north-1.myhuaweicloud.com:20004") .withAk("${your_AK}") .withSk("${your_SK}") .withProjectId("${your_projectId}") .withRegion("cn-north-1") .build(); // 配置流名称 String streamName = "dis-demo"; // 配置上传的数据 PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>(); String[] messages = { 此处填写上一步准备的数据样例 }; for (int i = 0; i < messages.length; i++) { ByteBuffer buffer = ByteBuffer.wrap(messages[i].getBytes()); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(buffer); putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000))); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); log.info("========== BEGIN PUT ============"); PutRecordsResult putRecordsResult = null; try { putRecordsResult = dic.putRecords(putRecordsRequest); } catch (DISClientException e) { log.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (ResourceAccessException e) { log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e); } catch (Exception e) { log.error(e.getMessage(), e); } if (putRecordsResult != null) { log.info("Put {} records[{} successful / {} failed].", putRecordsResult.getRecords().size(), putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(), putRecordsResult.getFailedRecordCount()); for (int j = 0; j < putRecordsResult.getRecords().size(); j++) { PutRecordsResultEntry putRecordsRequestEntry = putRecordsResult.getRecords().get(j); if (putRecordsRequestEntry.getErrorCode() != null) { // 上传失败 log.error("[{}] put failed, errorCode [{}], errorMessage [{}]", new String(putRecordsRequestEntryList.get(j).getData().array()), putRecordsRequestEntry.getErrorCode(), putRecordsRequestEntry.getErrorMessage()); } else { // 上传成功 log.info("[{}] put success, partitionId [{}], partitionKey [{}], sequenceNumber [{}]", new String(putRecordsRequestEntryList.get(j).getData().array()), putRecordsRequestEntry.getPartitionId(), putRecordsRequestEntryList.get(j).getPartitionKey(), putRecordsRequestEntry.getSequenceNumber()); } } } log.info("========== END PUT ============"); }
启动数据上传程序
数据上传过程中可在Console控制台查看数据上传通道量信息。出现类似信息表示数据上传成功。