Spark Core Sample Projects (Java)
Function
Collects the information of female netizens who spend more than 2 hours in online shopping on the weekend from the log files.
Sample Code
The following code segment is only an example. For details, see the com.huawei.bigdata.spark.examples.FemaleInfoCollection class.
// Create a configuration class SparkConf, and then create a SparkContext. SparkSession spark = SparkSession .builder() .appName("CollectFemaleInfo") .config("spark.some.config.option", "some-value") .getOrCreate(); // Read the source file data, and transfer each row of records to an element of the RDD. JavaRDD<String> data = spark.read() .textFile(args[0]) .javaRDD(); // Split each column of each record, and generate a Tuple. JavaRDD<Tuple3<String,String,Integer>> person = data.map(new Function<String,Tuple3<String,String,Integer>>() { private static final long serialVersionUID = -2381522520231963249L; public Tuple3<String, String, Integer> call(String s) throws Exception { // Split a row of data by commas (,). String[] tokens = s.split(","); // Integrate the three split elements to a ternary Tuple. Tuple3<String, String, Integer> person = new Tuple3<String, String, Integer>(tokens[0], tokens[1], Integer.parseInt(tokens[2])); return person; } }); // Use the filter function to filter the data information about the time that female netizens spend online. JavaRDD<Tuple3<String,String,Integer>> female = person.filter(new Function<Tuple3<String,String,Integer>, Boolean>() { private static final long serialVersionUID = -4210609503909770492L; public Boolean call(Tuple3<String, String, Integer> person) throws Exception { // Filter the records of which the gender in the second column is female. Boolean isFemale = person._2().equals("female"); return isFemale; } }); // Aggregate the total time that each female netizen spends online. JavaPairRDD<String, Integer> females = female.mapToPair(new PairFunction<Tuple3<String, String, Integer>, String, Integer>() { private static final long serialVersionUID = 8313245377656164868L; public Tuple2<String, Integer> call(Tuple3<String, String, Integer> female) throws Exception { // Extract the two columns representing the name and online time for the sum of online time by name during further operations. Tuple2<String, Integer> femaleAndTime = new Tuple2<String, Integer>(female._1(), female._3()); return femaleAndTime; } }); JavaPairRDD<String, Integer> femaleTime = females.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = -3271456048413349559L; public Integer call(Integer integer, Integer integer2) throws Exception { // Sum two online time durations of the same female netizen. return (integer + integer2); } }); //Filter the information about female netizens who spend more than two hours online. JavaPairRDD<String, Integer> rightFemales = females.filter(new Function<Tuple2<String, Integer>, Boolean>() { private static final long serialVersionUID = -3178168214712105171L; public Boolean call(Tuple2<String, Integer> s) throws Exception { // Extract the total time that female netizens spend online, and determine whether the time is more than 2 hours. if(s._2() > (2 * 60)) { return true; } return false; } }); // Print the information about female netizens who meet the requirements. for(Tuple2<String, Integer> d: rightFemales.collect()) { System.out.println(d._1() + "," + d._2()); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot