更新时间:2022-07-19 GMT+08:00
分享

Java样例代码

功能简介

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollection:

        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc);

        // 通过隐式转换,将RDD转换成DataFrame
        JavaRDD<FemaleInfo> femaleInfoJavaRDD = jsc.textFile(args[0]).map(
                new Function<String, FemaleInfo>() {
                    @Override
                    public FemaleInfo call(String line) throws Exception {
                        String[] parts = line.split(",");

                        FemaleInfo femaleInfo = new FemaleInfo();
                        femaleInfo.setName(parts[0]);
                        femaleInfo.setGender(parts[1]);
                        femaleInfo.setStayTime(Integer.parseInt(parts[2].trim()));
                        return femaleInfo;
                    }
                });

        // 注册表。
        DataFrame schemaFemaleInfo = sqlContext.createDataFrame(femaleInfoJavaRDD,FemaleInfo.class);
        schemaFemaleInfo.registerTempTable("FemaleInfoTable");

        // 执行SQL查询
        DataFrame femaleTimeInfo = sqlContext.sql("select * from " +
                "(select name,sum(stayTime) as totalStayTime from FemaleInfoTable " +
                "where gender = 'female' group by name )" +
                " tmp where totalStayTime >120");

       // 显示结果。
        List<String> result = femaleTimeInfo.javaRDD().map(new Function<Row, String>() {
            public String call(Row row) {
                return row.getString(0) + "," + row.getLong(1);
            }
        }).collect();
        System.out.println(result);
        jsc.stop();

上面是简单示例,其它sparkSQL特性请参见如下链接:http://spark.apache.org/docs/latest/sql-programming-guide.html#running-sql-queries-programmatically

相关文档