Scala Sample Code
- Code for sending messages:
For the complete code, see com.huawei.bigdata.flink.examples.Information in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples case class Inforamtion(index: Int, content: String) { def this() = this(0, "") }
- The publisher customizes source operators to generate data.
For the complete code, see com.huawei.bigdata.flink.examples.UserSource in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{ var isRunning = true override def open(parameters: Configuration): Unit = { super.open(parameters) } // Generate 10000 pieces of data each second. override def run(sourceContext: SourceContext[Inforamtion]) = { while (isRunning) { for (i <- 0 until 10000) { sourceContext.collect(Inforamtion(i, "hello-" + i)); } Thread.sleep(1000) } } override def close(): Unit = super.close() override def cancel() = { isRunning = false } }
- Code for the publisher:
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySink in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.sink.NettySink import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ object TestPipeline_NettySink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the concurrency of job to 2. env.setParallelism(2) //Set Zookeeper as the registration server val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add a user-defined operator to generate data. env.addSource(new UserSource) .keyBy(0).map(x=>x.content.getBytes)//Transform the to-be-sent data into a byte array. .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//Add NettySink operator to send data. env.execute() } }
- Code for the first subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // Set the concurrency of job to 2. env.setParallelism(2) //Set ZooKeeper as the registration server val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(x => (1, new String(x)))//Add a NettySource operator to receive messages from the publisher. .filter(x => { Random.nextInt(50000) == 10 }) .print env.execute() } }
- Code for the second subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 in the FlinkPipelineScalaExample project.
package com.huawei.bigdata.flink.examples import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.netty.source.NettySource import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler import org.apache.flink.streaming.api.scala._ import scala.util.Random object TestPipeline_NettySource2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //Set the concurrency of job to 2. env.setParallelism(2) //Set the concurrency of job to 2. val zkRegisterServerHandler = new ZookeeperRegisterServerHandler //Add NettySource operator and receive data. env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(x=>(2, new String(x)))//Transform the received byte array into character strings. .filter(x=>{ Random.nextInt(50000) == 10 }) .print() env.execute() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.