Flink Job Pipeline样例程序(Scala)
- 发送消息
下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.Information:
package com.huawei.bigdata.flink.examples case class Inforamtion(index: Int, content: String) { def this() = this(0, "") }
- 发布者job自定义source算子产生数据
下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.UserSource:
package com.huawei.bigdata.flink.examples import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{ var isRunning = true override def open(parameters: Configuration): Unit = { super.open(parameters) } // 每秒钟产生10000条数据 override def run(sourceContext: SourceContext[Inforamtion]) = { while (isRunning) { for (i <- 0 until 10000) { sourceContext.collect(Inforamtion(i, "hello-" + i)); } Thread.sleep(1000) } } override def close(): Unit = super.close() override def cancel() = { isRunning = false } }
- 发布者代码
下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySink:
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.sink.NettySink import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ object TestPipeline_NettySink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置job的并发度为2 env.setParallelism(2) //设置Zookeeper为注册服务器 val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //添加用户自定义算子产生数据 env.addSource(new UserSource) .keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组 .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据 env.execute() } }
- 第一个订阅者
下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource1:
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置Job的并发度为2 env.setParallelism(2) //设置Zookeeper作为注册服务器 val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //添加NettySource算子,接收来自发布者的数据 env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(x => (1, new String(x)))//将接收到的字节流转化成字符串 .filter(x => { Random.nextInt(50000) == 10 }) .print env.execute() } }
- 第二个订阅者
下面代码片段仅为演示,完整代码参见FlinkPipelineScalaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource2:
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置job的并发度为2 env.setParallelism(2) //创建Zookeeper作为注册服务器 val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //添加NettySource算子,接收数据 env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串 .filter(x=>{ Random.nextInt(50000) == 10 }) .print() env.execute() } }