Updated on 2024-08-10 GMT+08:00

Spark Core Sample Projects (Java)

Function

Collect statistics on female netizens who dwell on online shopping for more than two hours during weekends.

Sample Code

The following code snippets are used as an example. For complete codes, see the com.huawei.bigdata.spark.examples.FemaleInfoCollection class.

    //Create a configuration class SparkConf, and then create a SparkContext.
    SparkSession spark = SparkSession
      .builder()
      .appName("CollectFemaleInfo")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();

    //Read the source file data, and transfer each row of records to an element of the RDD.
    JavaRDD<String> data = spark.read()
      .textFile(args[0])
      .javaRDD();

    //Split each column of each record, and generate a Tuple.
    JavaRDD<Tuple3<String,String,Integer>> person = data.map(new Function<String,Tuple3<String,String,Integer>>()
    {
        private static final long serialVersionUID = -2381522520231963249L;
        public Tuple3<String, String, Integer> call(String s) throws Exception
        {
            //Split a row of data by commas (,).
            String[] tokens = s.split(",");

            //Integrate the three split elements to a ternary Tuple.
            Tuple3<String, String, Integer> person = new Tuple3<String, String, Integer>(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
            return person;
        }
    });

    //Use the filter function to filter the data information about the time that female netizens spend online.
    JavaRDD<Tuple3<String,String,Integer>> female = person.filter(new Function<Tuple3<String,String,Integer>, Boolean>()
    {
        private static final long serialVersionUID = -4210609503909770492L;

        public Boolean call(Tuple3<String, String, Integer> person) throws Exception
        {
            //Filter the records of which the gender in the second column is female.
            Boolean isFemale = person._2().equals("female");
            return isFemale;
        }
    });

    //Aggregate the total time that each female netizen spends online.
    JavaPairRDD<String, Integer> females = female.mapToPair(new PairFunction<Tuple3<String, String, Integer>, String, Integer>()
    {
        private static final long serialVersionUID = 8313245377656164868L;

        public Tuple2<String, Integer> call(Tuple3<String, String, Integer> female) throws Exception
        {
            //Extract the two columns representing the name and online time for the sum of online time by name during further operations.
            Tuple2<String, Integer> femaleAndTime = new  Tuple2<String, Integer>(female._1(), female._3());
            return femaleAndTime;
        }
    });
      JavaPairRDD<String, Integer> femaleTime = females.reduceByKey(new Function2<Integer, Integer, Integer>()
    {
        private static final long serialVersionUID = -3271456048413349559L;

        public Integer call(Integer integer, Integer integer2) throws Exception
        {
            //Add up the durations of online activity for a female user.
            return (integer + integer2);
        }
    });

    //Filter the information about female netizens who spend more than 2 hours online.
    JavaPairRDD<String, Integer> rightFemales = females.filter(new Function<Tuple2<String, Integer>, Boolean>()
    {
        private static final long serialVersionUID = -3178168214712105171L;

        public Boolean call(Tuple2<String, Integer> s) throws Exception
        {
            //Extract the total time that female netizens spend online, and determine whether the time is more than 2 hours.
            if(s._2() > (2 * 60))
            {
                return true;
            }
            return false;
        }
    });

    //Print the information about female netizens who meet the requirements.
    for(Tuple2<String, Integer> d: rightFemales.collect())
    {
        System.out.println(d._1() + "," + d._2());
    }