Help Center/
MapReduce Service/
Developer Guide (LTS)/
Flink Development Guide (Security Mode)/
Developing a Flink Application/
Flink Job Pipeline Sample Program/
Flink Job Pipeline Sample Program (Scala)
Updated on 2024-08-10 GMT+08:00
Flink Job Pipeline Sample Program (Scala)
Following is the main logic code for demonstration.
For details about the complete code, see the following:
- com.huawei.bigdata.flink.examples.UserSource.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySink.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource1.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource2.
- Code for sending messages:
package com.huawei.bigdata.flink.examples case class Inforamtion(index: Int, content: String) { def this() = this(0, "") }
- The publisher customizes source operators to generate data.
package com.huawei.bigdata.flink.examples import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{ var isRunning = true override def open(parameters: Configuration): Unit = { super.open(parameters) } // Generate 10000 pieces of data each second. override def run(sourceContext: SourceContext[Inforamtion]) = { while (isRunning) { for (i <- 0 until 10000) { sourceContext.collect(Inforamtion(i, "hello-" + i)); } Thread.sleep(1000) } } override def close(): Unit = super.close() override def cancel() = { isRunning = false } }
- Code for the publisher:
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.sink.NettySink import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ object TestPipeline_NettySink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the concurrency of job to 2. env.setParallelism(2) //Set Zookeeper as the registration server val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add a user-defined operator to generate data. env.addSource(new UserSource) .keyBy(0).map(x=>x.content.getBytes)//Transform the to-be-sent data into a byte array. .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//Add NettySink operator to send data. env.execute() } }
- Code for the first subscriber
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the concurrency of job to 2. env.setParallelism(2) //Set ZooKeeper as the registration server val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(x => (1, new String(x)))//Add a NettySource operator to receive messages from the publisher. .filter(x => { Random.nextInt(50000) == 10 }) .print env.execute() } }
- Code for the second subscriber.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //Set the concurrency of job to 2. env.setParallelism(2) //Set the concurrency of job to 2. val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add NettySource operator and receive data. env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(x=>(2, new String(x)))//Transform the received byte array into character strings. .filter(x=>{ Random.nextInt(50000) == 10 }) .print() env.execute() } }
Parent topic: Flink Job Pipeline Sample Program
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot