更新时间:2024-05-11 GMT+08:00

CarbonData快速入门

本章节介绍创建CarbonData table、加载数据,以及查询数据的快速入门流程。该快速入门提供基于Spark Beeline客户端的操作。如果使用Spark shell,需将查询命令写在spark.sql()的括号中。

本操作以从CSV文件加载数据到CarbonData Table为例

表1 CarbonData快速入门

操作

说明

准备CSV文件

准备加载到CarbonData Table的CSV文件。

连接到CarbonData

在对CarbonData进行任何一种操作之前,首先需要连接到CarbonData。

创建CarbonData Table

连接到CarbonData之后,需要创建CarbonData table用于加载数据和执行查询操作。

加载数据到CarbonData Table

创建CarbonData table之后,可以从CSV文件加载数据到所创建的table中。

在CarbonData中查询数据

创建CarbonData table并加载数据之后,可以执行所需的查询操作,例如filters,groupby等。

准备CSV文件

  1. 在本地准备CSV文件,文件名为:test.csv,样例如下:
    13418592122,1001,MAC地址,2017-10-23 15:32:30,2017-10-24 15:32:30,62.50,74.56
    13418592123,1002,MAC地址,2017-10-23 16:32:30,2017-10-24 16:32:30,17.80,76.28
    13418592124,1003,MAC地址,2017-10-23 17:32:30,2017-10-24 17:32:30,20.40,92.94
    13418592125,1004,MAC地址,2017-10-23 18:32:30,2017-10-24 18:32:30,73.84,8.58
    13418592126,1005,MAC地址,2017-10-23 19:32:30,2017-10-24 19:32:30,80.50,88.02
    13418592127,1006,MAC地址,2017-10-23 20:32:30,2017-10-24 20:32:30,65.77,71.24
    13418592128,1007,MAC地址,2017-10-23 21:32:30,2017-10-24 21:32:30,75.21,76.04
    13418592129,1008,MAC地址,2017-10-23 22:32:30,2017-10-24 22:32:30,63.30,94.40
    13418592130,1009,MAC地址,2017-10-23 23:32:30,2017-10-24 23:32:30,95.51,50.17
    13418592131,1010,MAC地址,2017-10-24 00:32:30,2017-10-25 00:32:30,39.62,99.13
  2. 使用WinSCP工具将CSV文件导入客户端节点,例如“/opt”目录下。
  3. 登录FusionInsight Manager页面,选择“系统 > 权限 > 用户”,添加人机用户sparkuser,用户组(hadoop、hive),主组(hadoop)。
  4. 进入客户端目录,加载环境变量并认证用户:

    cd /客户端安装目录

    source ./bigdata_env

    source ./Spark2x/component_env

    kinit sparkuser

  5. 上传CSV中的文件到HDFS的“/data”目录:

    hdfs dfs -put /opt/test.csv /data/

连接到CarbonData

  • 使用Spark SQL或Spark shell连接到Spark并执行Spark SQL命令。
  • 开启JDBCServer并使用JDBC客户端(例如,Spark Beeline)连接。

    执行如下命令:

    cd ./Spark2x/spark/bin

    ./spark-beeline

创建CarbonData Table

在Spark Beeline被连接到JDBCServer之后,需要创建一个CarbonData table用于加载数据和执行查询操作。下面是创建一个简单的表的命令。

create table x1 (imei string, deviceInformationId int, mac string, productdate timestamp, updatetime timestamp, gamePointId double, contractNumber double) STORED AS carbondata TBLPROPERTIES ('SORT_COLUMNS'='imei,mac');

命令执行结果如下:

+---------+
| Result  |
+---------+
+---------+
No rows selected (1.093 seconds)

加载数据到CarbonData Table

创建CarbonData table之后,可以从CSV文件加载数据到所创建的表中。

用所要求的参数运行以下命令从CSV文件加载数据。该表的列名需要与CSV文件的列名匹配。

LOAD DATA inpath 'hdfs://hacluster/data/test.csv' into table x1 options('DELIMITER'=',', 'QUOTECHAR'='"','FILEHEADER'='imei, deviceinformationid,mac, productdate,updatetime, gamepointid,contractnumber');

其中,“test.csv”准备CSV文件的CSV文件,“x1”为示例的表名。

CSV样例内容如下:

13418592122,1001,MAC地址,2017-10-23 15:32:30,2017-10-24 15:32:30,62.50,74.56
13418592123,1002,MAC地址,2017-10-23 16:32:30,2017-10-24 16:32:30,17.80,76.28
13418592124,1003,MAC地址,2017-10-23 17:32:30,2017-10-24 17:32:30,20.40,92.94
13418592125,1004,MAC地址,2017-10-23 18:32:30,2017-10-24 18:32:30,73.84,8.58
13418592126,1005,MAC地址,2017-10-23 19:32:30,2017-10-24 19:32:30,80.50,88.02
13418592127,1006,MAC地址,2017-10-23 20:32:30,2017-10-24 20:32:30,65.77,71.24
13418592128,1007,MAC地址,2017-10-23 21:32:30,2017-10-24 21:32:30,75.21,76.04
13418592129,1008,MAC地址,2017-10-23 22:32:30,2017-10-24 22:32:30,63.30,94.40
13418592130,1009,MAC地址,2017-10-23 23:32:30,2017-10-24 23:32:30,95.51,50.17
13418592131,1010,MAC地址,2017-10-24 00:32:30,2017-10-25 00:32:30,39.62,99.13

命令执行结果如下:

+------------+
|Segment ID  |
+------------+
|0           |
+------------+
No rows selected (3.039 seconds)

在CarbonData中查询数据

创建CarbonData table并加载数据之后,可以执行所需的数据查询操作。以下为一些查询操作举例。

  • 获取记录数

    为了获取在CarbonData table中的记录数,可以运行以下命令。

    select count(*) from x1;

  • 使用Groupby查询

    为了获取不重复的deviceinformationid记录数,可以运行以下命令。

    select deviceinformationid,count (distinct deviceinformationid) from x1 group by deviceinformationid;

  • 用Filter查询

    为了获取特定deviceinformationid的记录,可以运行以下命令。

    select * from x1 where deviceinformationid='1010';

在Spark-shell上使用CarbonData

用户若需要在Spark-shell上使用CarbonData,需通过如下方式创建CarbonData Table,加载数据到CarbonData Table和在CarbonData中查询数据的操作。

spark.sql("CREATE TABLE x2(imei string, deviceInformationId int, mac string, productdate timestamp, updatetime timestamp, gamePointId double, contractNumber double) STORED AS carbondata")
spark.sql("LOAD DATA inpath 'hdfs://hacluster/data/x1_without_header.csv' into table x2 options('DELIMITER'=',', 'QUOTECHAR'='\"','FILEHEADER'='imei, deviceinformationid,mac, productdate,updatetime, gamepointid,contractnumber')")
spark.sql("SELECT * FROM x2").show()