Updated on 2022-11-18 GMT+08:00

Java Example Code

Function

Collects the information of female netizens who spend more than 2 hours in online shopping on the weekend from the log files (The class name and file name must be the same as those in the actual code. The following is only an example).

Example Code

The following code segment is only an example. For details, see com.huawei.bigdata.spark.examples.FemaleInfoCollection.

public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
      .builder()
      .appName("CollectFemaleInfo")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();

        // Convert RDD to DataFrame through the implicit conversion.
        JavaRDD<FemaleInfo> femaleInfoJavaRDD = spark.read().textFile(args[0]). javaRDD().map(
                new Function<String, FemaleInfo>() {
                    @Override
                    public FemaleInfo call(String line) throws Exception {
                        String[] parts = line.split(",");
                        FemaleInfo femaleInfo = new FemaleInfo();
                        femaleInfo.setName(parts[0]);
                        femaleInfo.setGender(parts[1]);
                        femaleInfo.setStayTime(Integer.parseInt(parts[2].trim()));
                        return femaleInfo;
                    }
                });

        // Register table.
        Dataset<ROW> schemaFemaleInfo = spark.createDataFrame(femaleInfoJavaRDD,FemaleInfo.class);
        schemaFemaleInfo.registerTempTable("FemaleInfoTable");

        // Run SQL query
        Dataset<ROW> femaleTimeInfo = spark.sql("select * from " +
                "(select name,sum(stayTime) as totalStayTime from FemaleInfoTable " +
                "where gender = 'female' group by name )" +
                " tmp where totalStayTime >120");

        // Collect the columns of a row in the result.
        List<String> result = femaleTimeInfo.javaRDD().map(new Function<Row, String>() {
            public String call(Row row) {
                return  row.getString(0) + "," + row.getLong(1);
            }
        }).collect();
        System.out.println(result);
        spark.stop();
    }

In the preceding code example, data processing logic is implemented by SQL statements. It can also be implemented by invoking the SparkSQL interface in Scala/Java/Python code. For details, see http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#running-sql-queries-programmatically