Spark SQL样例程序(Java)
功能简介
统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。
代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollection:
public static void main(String[] args) throws Exception { SparkSession spark = SparkSession .builder() .appName("CollectFemaleInfo") .config("spark.some.config.option", "some-value") .getOrCreate(); // 通过隐式转换,将RDD转换成DataFrame JavaRDD<FemaleInfo> femaleInfoJavaRDD = spark.read().textFile(args[0]). javaRDD().map( new Function<String, FemaleInfo>() { @Override public FemaleInfo call(String line) throws Exception { String[] parts = line.split(","); FemaleInfo femaleInfo = new FemaleInfo(); femaleInfo.setName(parts[0]); femaleInfo.setGender(parts[1]); femaleInfo.setStayTime(Integer.parseInt(parts[2].trim())); return femaleInfo; } }); // 注册表。 Dataset<ROW> schemaFemaleInfo = spark.createDataFrame(femaleInfoJavaRDD,FemaleInfo.class); schemaFemaleInfo.registerTempTable("FemaleInfoTable"); // 执行SQL查询 Dataset<ROW> femaleTimeInfo = spark.sql("select * from " + "(select name,sum(stayTime) as totalStayTime from FemaleInfoTable " + "where gender = 'female' group by name )" + " tmp where totalStayTime >120"); // 显示结果。 List<String> result = femaleTimeInfo.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return row.getString(0) + "," + row.getLong(1); } }).collect(); System.out.println(result); spark.stop(); }
上面是简单示例,其它SparkSQL特性请参见如下链接:http://archive.apache.org/dist/spark/docs/3.3.1/sql-programming-guide.html#running-sql-queries-programmatically。