更新时间:2024-06-27 GMT+08:00
分享

Flink Job Pipeline样例程序(Scala)

  1. 发送消息

    下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.Information:

    package com.huawei.bigdata.flink.examples
     
    case class Inforamtion(index: Int, content: String) {
     
      def this() = this(0, "")
    }
  1. 发布者job自定义source算子产生数据

    下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.UserSource:

    package com.huawei.bigdata.flink.examples
     
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
     
    class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
     
      var isRunning = true
     
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
       
      }
     
    // 每秒钟产生10000条数据
      override def run(sourceContext: SourceContext[Inforamtion]) = {
     
        while (isRunning) {
          for (i <- 0 until 10000) {
            sourceContext.collect(Inforamtion(i, "hello-" + i));
     
          }
          Thread.sleep(1000)
        }
      }
     
      override def close(): Unit = super.close()
     
      override def cancel() = {
        isRunning = false
      }
    }
  1. 发布者代码

    下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySink:

    package com.huawei.bigdata.flink.examples
     
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.netty.sink.NettySink
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
    import org.apache.flink.streaming.api.scala._
     
    object TestPipeline_NettySink {
     
      def main(args: Array[String]): Unit = {
     
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置job的并发度为2    
    env.setParallelism(2)
    //设置Zookeeper为注册服务器
        val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
    //添加用户自定义算子产生数据    
    env.addSource(new UserSource)
          .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
          .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
     
        env.execute()
      }
    }
  1. 第一个订阅者

    下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource1:

    package com.huawei.bigdata.flink.examples
     
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.netty.source.NettySource
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
    import org.apache.flink.streaming.api.scala._
     
    import scala.util.Random
     
     
    object TestPipeline_NettySource1 {
     
      def main(args: Array[String]): Unit = {
     
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置Job的并发度为2  
      env.setParallelism(2)
    //设置Zookeeper作为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
    //添加NettySource算子,接收来自发布者的数据
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
          .map(x => (1, new String(x)))//将接收到的字节流转化成字符串
          .filter(x => {
            Random.nextInt(50000) == 10
          })
          .print
     
        env.execute()
      }
    }
  1. 第二个订阅者

    下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource2:

    package com.huawei.bigdata.flink.examples
     
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.netty.source.NettySource
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
    import org.apache.flink.streaming.api.scala._
     
    import scala.util.Random
     
     
    object TestPipeline_NettySource2 {
     
      def main(args: Array[String]): Unit = {
     
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置job的并发度为2   
     env.setParallelism(2)
    //创建Zookeeper作为注册服务器
        val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
    //添加NettySource算子,接收数据    
    env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
          .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
          .filter(x=>{
            Random.nextInt(50000) == 10
          })
          .print()
     
        env.execute()
      }
    }

相关文档