更新时间:2024-08-03 GMT+08:00

Flink Job Pipeline样例程序(Scala)

下面列出的主要逻辑代码作为演示。

完整代码请参阅:

  • com.huawei.bigdata.flink.examples.UserSource。
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySink。
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySource1。
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySource2。
  1. 发送消息
package com.huawei.bigdata.flink.examples
 
case class Inforamtion(index: Int, content: String) {
 
  def this() = this(0, "")
}
  1. 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
 
  var isRunning = true
 
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
   
  }
 
// 每秒钟产生10000条数据
  override def run(sourceContext: SourceContext[Inforamtion]) = {
 
    while (isRunning) {
      for (i <- 0 until 10000) {
        sourceContext.collect(Inforamtion(i, "hello-" + i));
 
      }
      Thread.sleep(1000)
    }
  }
 
  override def close(): Unit = super.close()
 
  override def cancel() = {
    isRunning = false
  }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
object TestPipeline_NettySink {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置job的并发度为2    
env.setParallelism(2)
//设置Zookeeper为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加用户自定义算子产生数据    
env.addSource(new UserSource)
      .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
      .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
 
    env.execute()
  }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource1 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置Job的并发度为2  
  env.setParallelism(2)
//设置Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收来自发布者的数据
    env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
      .map(x => (1, new String(x)))//将接收到的字节流转化成字符串
      .filter(x => {
        Random.nextInt(50000) == 10
      })
      .print
 
    env.execute()
  }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource2 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job的并发度为2   
 env.setParallelism(2)
//创建Zookeeper作为注册服务器
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收数据    
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
      .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
      .filter(x=>{
        Random.nextInt(50000) == 10
      })
      .print()
 
    env.execute()
  }
}