更新时间:2024-08-03 GMT+08:00

Java样例代码

功能介绍

在Spark应用中,通过使用Streaming调用kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,或将数据写入Kafka0-10。

Streaming读取Kafka0-10代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SecurityKafkaWordCount。样例代码下载地址请参见获取MRS应用开发样例工程

普通集群需要将样例代码中com.huawei.bigdata.spark.examples.SecurityKafkaWordCount类中第78行代码

“kafkaParams.put("security.protocol", "SASL_PLAINTEXT");”注释掉。

/**
  *从Kafka的一个或多个主题消息。
  * <checkPointDir>是Spark Streaming检查点目录。
  * <brokers>是用于自举,制作人只会使用它来获取元数据
  * <topics>是要消费的一个或多个kafka主题的列表
  * <batchTime>是Spark Streaming批次持续时间(以秒为单位)。
 */
public class SecurityKafkaWordCount
{
  public static void main(String[] args) throws Exception {
    JavaStreamingContext ssc = createContext(args);

    //启动Streaming系统。
    ssc.start();
    try {
      ssc.awaitTermination();
    } catch (InterruptedException e) {
    }
  }

  private static JavaStreamingContext createContext(String[] args) throws Exception {
    String checkPointDir = args[0];
    String brokers = args[1];
    String topics = args[2];
    String batchSize = args[3];

    //新建一个Streaming启动环境.
    SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchSize) * 1000));

    //配置Streaming的CheckPoint目录。
    //由于窗口概念的存在,此参数是必需的。
 ssc.checkpoint(checkPointDir);

    //获取获取kafka使用的topic列表。
    String[] topicArr = topics.split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));
    Map<String, Object> kafkaParams = new HashMap();
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("group.id", "DemoConsumer");
    kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name", "kafka");
    kafkaParams.put("kerberos.domain.name", "hadoop.hadoop.com");

    LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
    ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams);

    //用brokers and topics新建direct kafka stream
    //从Kafka接收数据并生成相应的DStream。
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy);

    //获取每行中的字段属性。
    JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
      @Override
      public String call(ConsumerRecord<String, String> tuple2) throws Exception {
        return tuple2.value();
      }
    });

    //汇总计算字数的总时间。
    JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
        new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
          }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    }).updateStateByKey(
        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
          @Override
          public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
            int out = 0;
            if (state.isPresent()) {
              out += state.get();
            }
            for (Integer v : values) {
              out += v;
            }
            return Optional.of(out);
          }
        });

    //打印结果
    wordCounts.print();
    return ssc;
  }
}

Streaming Write To Kafka 0-10样例代码

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.DstreamKafkaWriter。

  • 建议使用新的API createDirectStream代替旧的API createStream进行应用程序开发。旧的API仍然可以使用,但新的API性能和稳定性更好。
  • 该样例代码只存在于mrs-sample-project-1.6.0.zip中。
/**
 * 参数解析:
 * <checkPointDir>为checkPoint目录。
 * <topics>为Kafka中订阅的主题,多以逗号分隔。
 * <brokers>为获取元数据的Kafka地址。
 */
public class JavaDstreamKafkaWriter {

  public static void main(String[] args) throws InterruptedException {

    if (args.length != 4) {
      System.err.println("Usage: DstreamKafkaWriter <checkPointDir> <brokers> <topic>");
      System.exit(1);
    }

    String checkPointDir = args[0];
    String brokers = args[1];
    String topic = args[2];

    SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");

    //填写Kafka的properties。
    Map kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("zookeeper.connect", brokers);
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("group.id", "dstreamKafkaWriterFt08");
    kafkaParams.put("auto.offset.reset", "smallest");

    // 创建Java Spark Streaming的Context。
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));

    //填写写入Kafka的数据。
    List<String> sentData = new ArrayList<String>();
    sentData.add("kafka_writer_test_msg_01");
    sentData.add("kafka_writer_test_msg_02");
    sentData.add("kafka_writer_test_msg_03");

    //创建Java RDD队列。
    Queue<JavaRDD<String>> sent = new LinkedList();
    sent.add(ssc.sparkContext().parallelize(sentData));

    //创建写数据的Java DStream。
    JavaDStream wStream = ssc.queueStream(sent);
    //写入Kafka。
    JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(JavaConverters.mapAsScalaMapConverter(kafkaParams),
        new Function<String, ProducerRecord<String, byte[]>>() {
          public ProducerRecord<String, byte[]> call(String s) {
            return new ProducerRecord(topic, s.getBytes());
          }
        });

    ssc.start();
    ssc.awaitTermination();
  }
}