Updated on 2022-09-14 GMT+08:00

Java Sample Code

Function

Collects the information of female netizens who spend more than 2 hours in online shopping on the weekend from the log files.

Sample Code

The following code segment is only an example. For details, see com.huawei.bigdata.spark.examples.FemaleInfoCollection.

public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
      .builder()
      .appName("CollectFemaleInfo")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();

        // Convert RDD to DataFrame through the implicit conversion.
        JavaRDD<FemaleInfo> femaleInfoJavaRDD = spark.read().textFile(args[0]). javaRDD().map(
                new Function<String, FemaleInfo>() {
                    @Override
                    public FemaleInfo call(String line) throws Exception {
                        String[] parts = line.split(",");
                        FemaleInfo femaleInfo = new FemaleInfo();
                        femaleInfo.setName(parts[0]);
                        femaleInfo.setGender(parts[1]);
                        femaleInfo.setStayTime(Integer.parseInt(parts[2].trim()));
                        return femaleInfo;
                    }
                });

        // Register table.
        Dataset<ROW> schemaFemaleInfo = spark.createDataFrame(femaleInfoJavaRDD,FemaleInfo.class);
        schemaFemaleInfo.registerTempTable("FemaleInfoTable");

        // Run SQL query
        Dataset<ROW> femaleTimeInfo = spark.sql("select * from " +
                "(select name,sum(stayTime) as totalStayTime from FemaleInfoTable " +
                "where gender = 'female' group by name )" +
                " tmp where totalStayTime >120");

        // Collect the columns of a row in the result.
        List<String> result = femaleTimeInfo.javaRDD().map(new Function<Row, String>() {
            public String call(Row row) {
                return  row.getString(0) + "," + row.getLong(1);
            }
        }).collect();
        System.out.println(result);
        spark.stop();
    }

In the preceding code example, data processing logic is implemented by SQL statements. It can also be implemented by invoking the SparkSQL interface in Scala/Java/Python code. For details, see http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#running-sql-queries-programmatically