文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Flink开发指南(安全模式)/
开发Flink应用/
Flink Job Pipeline样例程序/
Flink Job Pipeline样例程序(Java)
更新时间:2024-08-03 GMT+08:00
Flink Job Pipeline样例程序(Java)
下面列出的主要逻辑代码作为演示。
完整代码请参阅:
- com.huawei.bigdata.flink.examples.UserSource。
- com.huawei.bigdata.flink.examples.TestPipeline_NettySink。
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource1。
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource2。
- 发布Job自定义Source算子产生数据
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.io.Serializable;
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
private boolean isRunning = true;
public void open(Configuration configuration) throws Exception {
super.open(configuration);
}
/**
* 数据产生函数,每秒钟产生10000条数据
*/
public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
while(isRunning) {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple2.of(i, "hello-" + i));
}
Thread.sleep(1000);
}
}
public void close() {
isRunning = false;
}
public void cancel() {
isRunning = false;
}
}
- 发布者代码
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySink {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置job的并发度为2
env.setBufferTimeout(2);
// 创建Zookeeper的注册服务器handler
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// 添加自定义Source算子
env.addSource(new UserSource())
.keyBy(0)
.map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
//将发送信息转化成字节数组
@Override
public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.f1.getBytes();
}
}).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
env.execute();
}
}
- 第一个订阅者
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置job的并发度为2
env.setParallelism(2);
// 创建Zookeeper的注册服务器句柄
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的消息
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
// 将接收到的字节流转化成字符串
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
}
- 第二个订阅者
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置作业的并发度为2
env.setParallelism(2);
//创建Zookeeper的注册服务器句柄
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//添加NettySource算子,接收来自发布者的数据
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
//将接收到的字节数组转化成字符串
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
}