Diese Seite ist in Ihrer lokalen Sprache noch nicht verfügbar. Wir arbeiten daran, weitere Sprachversionen hinzuzufügen. Vielen Dank für Ihre Unterstützung.

On this page

Show all

Scala Sample Code

Updated on 2022-11-18 GMT+08:00

Following is the main logic code for demonstration.

For details about the complete code, see the following:

  • com.huawei.bigdata.flink.examples.UserSource.
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySink.
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySource1.
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySource2.
  1. Code for sending messages:
package com.huawei.bigdata.flink.examples
 
case class Inforamtion(index: Int, content: String) {
 
  def this() = this(0, "")
}
  1. The publisher customizes source operators to generate data.
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{
 
  var isRunning = true
 
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
   
  }
 
// Generate 10000 pieces of data each second.
  override def run(sourceContext: SourceContext[Inforamtion]) = {
 
    while (isRunning) {
      for (i <- 0 until 10000) {
        sourceContext.collect(Inforamtion(i, "hello-" + i));
 
      }
      Thread.sleep(1000)
    }
  }
 
  override def close(): Unit = super.close()
 
  override def cancel() = {
    isRunning = false
  }
}
  1. Code for the publisher:
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
object TestPipeline_NettySink {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set the concurrency of job to 2.
  env.setParallelism(2)
//Set Zookeeper as the registration server
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a user-defined operator to generate data.    
env.addSource(new UserSource)     .keyBy(0).map(x=>x.content.getBytes)//Transform the to-be-sent data into a byte array. 
      .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2))//Add NettySink operator to send data. 
    env.execute()
  }
}
  1. Code for the first subscriber
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource1 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set the concurrency of job to 2.  
  env.setParallelism(2)
//Set ZooKeeper as the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a NettySource operator to receive messages from the publisher.
    env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
      .map(x => (1, new String(x)))//Add a NettySource operator to receive messages from the publisher.
      .filter(x => {
        Random.nextInt(50000) == 10
      })
      .print
 
    env.execute()
  }
}
  1. Code for the second subscriber.
package com.huawei.bigdata.flink.examples
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
 
import scala.util.Random
 
 
object TestPipeline_NettySource2 {
 
  def main(args: Array[String]): Unit = {
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of job to 2.   
 env.setParallelism(2)
//Set the concurrency of job to 2.
    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add NettySource operator and receive data.
    
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
      .map(x=>(2, new String(x)))//Transform the received byte array into character strings.

      .filter(x=>{
        Random.nextInt(50000) == 10
      })
      .print()
 
    env.execute()
  }
}
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback