Scala Sample Code
- Code for sending messages:
For the complete code, see com.huawei.bigdata.flink.examples.Information in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples case class Inforamtion(index: Int, content: String) { def this() = this(0, "") }
- The publisher customizes source operators to generate data.
For the complete code, see com.huawei.bigdata.flink.examples.UserSource in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{ var isRunning = true override def open(parameters: Configuration): Unit = { super.open(parameters) } // Generate 10000 pieces of data each second. override def run(sourceContext: SourceContext[Inforamtion]) = { while (isRunning) { for (i <- 0 until 10000) { sourceContext.collect(Inforamtion(i, "hello-" + i)); } Thread.sleep(1000) } } override def close(): Unit = super.close() override def cancel() = { isRunning = false } }
- Code for the publisher:
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySink in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.sink.NettySink import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ object TestPipeline_NettySink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the concurrency of job to 2. env.setParallelism(2) //Set Zookeeper as the registration server val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add a user-defined operator to generate data. env.addSource(new UserSource) .keyBy(0).map(x=>x.content.getBytes)//Transform the to-be-sent data into a byte array. .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//Add NettySink operator to send data. env.execute() } }
- Code for the first subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the concurrency of job to 2. env.setParallelism(2) //Set ZooKeeper as the registration server val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(x => (1, new String(x)))//Add a NettySource operator to receive messages from the publisher. .filter(x => { Random.nextInt(50000) == 10 }) .print env.execute() } }
- Code for the second subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //Set the concurrency of job to 2. env.setParallelism(2) //Set the concurrency of job to 2. val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add NettySource operator and receive data. env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(x=>(2, new String(x)))//Transform the received byte array into character strings. .filter(x=>{ Random.nextInt(50000) == 10 }) .print() env.execute() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot