更新时间:2024-08-03 GMT+08:00

Flink Job Pipeline样例程序(Java)

下面列出的主要逻辑代码作为演示。

完整代码请参阅:

  • com.huawei.bigdata.flink.examples.UserSource。
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySink。
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySource1。
  • com.huawei.bigdata.flink.examples.TestPipeline_NettySource2。
  1. 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
import java.io.Serializable;
 
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
 
    private boolean isRunning = true;
 
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
 
    }
/**
    * 数据产生函数,每秒钟产生10000条数据
   */
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
 
        while(isRunning) {
            for (int i = 0; i < 10000; i++) {
                ctx.collect(Tuple2.of(i, "hello-" + i));
            }
            Thread.sleep(1000);
        }
    }
 
    public void close() {
        isRunning = false;
    }
 
    public void cancel() {
        isRunning = false;
    }
}
  1. 发布者代码
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySink {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置job的并发度为2
        env.setBufferTimeout(2);
 
// 创建Zookeeper的注册服务器handler
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// 添加自定义Source算子
        env.addSource(new UserSource())
                .keyBy(0)
                .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                    //将发送信息转化成字节数组
@Override
                    public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return integerStringTuple2.f1.getBytes();
                    }
                }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
 
        env.execute();
 
    }
}
  1. 第一个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource1 {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置job的并发度为2        
env.setParallelism(2);
 
// 创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的消息
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
                  // 将接收到的字节流转化成字符串  
    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}
  1. 第二个订阅者
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipeline_NettySource2 {
 
    public static void main(String[] args) throws Exception {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置作业的并发度为2       
 env.setParallelism(2);
 
//创建Zookeeper的注册服务器句柄
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的数据
        env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
          //将接收到的字节数组转化成字符串
                    @Override
                    public String map(byte[] b) {
                        return new String(b);
                    }
                }).print();
 
        env.execute();
    }
}