文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Flink开发指南(普通模式)/
开发Flink应用/
Flink Job Pipeline样例程序/
Flink Job Pipeline样例程序(Scala)
更新时间:2025-09-22 GMT+08:00
Flink Job Pipeline样例程序(Scala)
下面列出的主要逻辑代码作为演示。
完整代码请参阅:
- com.huawei.bigdata.flink.examples.UserSource。
- com.huawei.bigdata.flink.examples.TestPipeline_NettySink。
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource1。
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource2。
- 发送消息
package com.huawei.bigdata.flink.examples
case class Information(index: Int, content: String) {
def this() = this(0, "")
}
- 发布者job自定义source算子产生数据
package com.huawei.bigdata.flink.examples import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class UserSource extends RichParallelSourceFunction[Information] with Serializable{ var isRunning = true override def open(parameters: Configuration): Unit = { super.open(parameters) } // 每秒钟产生10000条数据 override def run(sourceContext: SourceContext[Information]) = { while (isRunning) { for (i <- 0 until 10000) { sourceContext.collect(Information(i, "hello-" + i)); } Thread.sleep(1000) } } override def close(): Unit = super.close() override def cancel() = { isRunning = false } }
- 发布者代码
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
object TestPipeline_NettySink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置job的并发度为2
env.setParallelism(2)
//设置Zookeeper为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加用户自定义算子产生数据
env.addSource(new UserSource)
.keyBy(0).map(x=>x.content.getBytes)//将发送数据转化成字节数组
.addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//添加NettySink算子发送数据
env.execute()
}
}
- 第一个订阅者
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 设置Job的并发度为2
env.setParallelism(2)
//设置Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收来自发布者的数据
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
.map(x => (1, new String(x)))//将接收到的字节流转化成字符串
.filter(x => {
Random.nextInt(50000) == 10
})
.print
env.execute()
}
}
- 第二个订阅者
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置job的并发度为2
env.setParallelism(2)
//创建Zookeeper作为注册服务器
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//添加NettySource算子,接收数据
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(x=>(2, new String(x)))//将接收到的字节数组转化成字符串
.filter(x=>{
Random.nextInt(50000) == 10
})
.print()
env.execute()
}
}