Java Example Code
Function
Collects the information of female netizens who spend more than 2 hours in online shopping on the weekend from the log files.
Example Code
The following code segment is only an example. For details, see com.huawei.bigdata.spark.examples.FemaleInfoCollection.
public static void main(String[] args) throws Exception { SparkSession spark = SparkSession .builder() .appName("CollectFemaleInfo") .config("spark.some.config.option", "some-value") .getOrCreate(); // Convert RDD to DataFrame through the implicit conversion. JavaRDD<FemaleInfo> femaleInfoJavaRDD = spark.read().textFile(args[0]). javaRDD().map( new Function<String, FemaleInfo>() { @Override public FemaleInfo call(String line) throws Exception { String[] parts = line.split(","); FemaleInfo femaleInfo = new FemaleInfo(); femaleInfo.setName(parts[0]); femaleInfo.setGender(parts[1]); femaleInfo.setStayTime(Integer.parseInt(parts[2].trim())); return femaleInfo; } }); // Register table. Dataset<ROW> schemaFemaleInfo = spark.createDataFrame(femaleInfoJavaRDD,FemaleInfo.class); schemaFemaleInfo.registerTempTable("FemaleInfoTable"); // Run SQL query Dataset<ROW> femaleTimeInfo = spark.sql("select * from " + "(select name,sum(stayTime) as totalStayTime from FemaleInfoTable " + "where gender = 'female' group by name )" + " tmp where totalStayTime >120"); // Collect the columns of a row in the result. List<String> result = femaleTimeInfo.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return row.getString(0) + "," + row.getLong(1); } }).collect(); System.out.println(result); spark.stop(); }
In the preceding code example, data processing logic is implemented by SQL statements. It can also be implemented by invoking the SparkSQL interface in Scala/Java/Python code. For details, see http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#running-sql-queries-programmatically
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.