Scala Sample Code
Following is the main logic code for demonstration.
For details about the complete code, see the following:
- com.huawei.bigdata.flink.examples.UserSource.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySink.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource1.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource2.
- Code for sending messages:
package com.huawei.bigdata.flink.examples
case class Inforamtion(index: Int, content: String) {
def this() = this(0, "")
} - The publisher customizes source operators to generate data.
package com.huawei.bigdata.flink.examples
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
var isRunning = true
override def open(parameters: Configuration): Unit = {
super.open(parameters)
}
// Generate 10000 pieces of data each second.
override def run(sourceContext: SourceContext[Inforamtion]) = {
while (isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(Inforamtion(i, "hello-" + i));
}
Thread.sleep(1000)
}
}
override def close(): Unit = super.close()
override def cancel() = {
isRunning = false
}
} - Code for the publisher:
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
object TestPipeline_NettySink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set the concurrency of job to 2.
env.setParallelism(2)
//Set Zookeeper as the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a user-defined operator to generate data.
env.addSource(new UserSource) .keyBy(0).map(x=>x.content.getBytes)//Transform the to-be-sent data into a byte array.
.addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//Add NettySink operator to send data.
env.execute()
}
} - Code for the first subscriber
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set the concurrency of job to 2.
env.setParallelism(2)
//Set ZooKeeper as the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a NettySource operator to receive messages from the publisher.
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
.map(x => (1, new String(x)))//Add a NettySource operator to receive messages from the publisher.
.filter(x => {
Random.nextInt(50000) == 10
})
.print
env.execute()
}
} - Code for the second subscriber.
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of job to 2.
env.setParallelism(2)
//Set the concurrency of job to 2.
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add NettySource operator and receive data.
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(x=>(2, new String(x)))//Transform the received byte array into character strings.
.filter(x=>{
Random.nextInt(50000) == 10
})
.print()
env.execute()
}
} Last Article: Java Sample Code
Next Article: Stream SQL Join Program
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.