Java Example Code
Function
Collects the information of female netizens who spend more than 2 hours in online shopping on the weekend from the log files.
Example Code
The following code segment is only an example. For details, see com.huawei.bigdata.spark.examples.FemaleInfoCollection.
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("CollectFemaleInfo")
.config("spark.some.config.option", "some-value")
.getOrCreate();
// Convert RDD to DataFrame through the implicit conversion.
JavaRDD<FemaleInfo> femaleInfoJavaRDD = spark.read().textFile(args[0]). javaRDD().map(
new Function<String, FemaleInfo>() {
@Override
public FemaleInfo call(String line) throws Exception {
String[] parts = line.split(",");
FemaleInfo femaleInfo = new FemaleInfo();
femaleInfo.setName(parts[0]);
femaleInfo.setGender(parts[1]);
femaleInfo.setStayTime(Integer.parseInt(parts[2].trim()));
return femaleInfo;
}
});
// Register table.
Dataset<ROW> schemaFemaleInfo = spark.createDataFrame(femaleInfoJavaRDD,FemaleInfo.class);
schemaFemaleInfo.registerTempTable("FemaleInfoTable");
// Run SQL query
Dataset<ROW> femaleTimeInfo = spark.sql("select * from " +
"(select name,sum(stayTime) as totalStayTime from FemaleInfoTable " +
"where gender = 'female' group by name )" +
" tmp where totalStayTime >120");
// Collect the columns of a row in the result.
List<String> result = femaleTimeInfo.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0) + "," + row.getLong(1);
}
}).collect();
System.out.println(result);
spark.stop();
}
In the preceding code example, data processing logic is implemented by SQL statements. It can also be implemented by invoking the SparkSQL interface in Scala/Java/Python code. For details, see http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#running-sql-queries-programmatically
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.