Flink Join Sample Program (Java)
Function
In the Flink application, this code invokes the flink-connector-kafka module's API to generate and consume data.
Sample Code
If the user needs to use FusionInsight Kafka interconnected with the security mode before the development, obtain the kafka-clients-*.jar JAR file from the Kafka client directory.
The following lists Producer, Consumer, and the main logic code used by Flink Stream SQL Join.
For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafkaand com.huawei.bigdata.flink.examples.SqlJoinWithSocket.
- A piece of user information is generated in Kafka every second. The user information includes the name, age, and gender.
//Producer code public class WriteIntoKafka { public static void main(String[] args) throws Exception { //Print the command reference for flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); //Construct the execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set the concurrency. env.setParallelism(1); //Parse the running parameters. ParameterTool paraTool = ParameterTool.fromArgs(args); //Construct a flow diagram and write the data generated from self-defined sources to Kafka. DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties()); producer.setWriteTimestampToKafka(true); messageStream.addSink(producer); //Invoke execute to trigger the execution. env.execute(); } //Customize the sources and generate a message every second. public static class SimpleStringGenerator implements SourceFunction<String> { static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"}; static final String[] SEX = {"MALE", "FEMALE"}; static final int COUNT = NAME.length; boolean running = true; Random rand = new Random(47); @Override //Use rand to randomly generate a combination of the name, gender, and age. public void run(SourceContext<String> ctx) throws Exception { while (running) { int i = rand.nextInt(COUNT); int age = rand.nextInt(70); String sexy = SEX[rand.nextInt(2)]; ctx.collect(NAME[i] + "," + age + "," + sexy); thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
- Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
public class SqlJoinWithSocket { public static void main(String[] args) throws Exception{ final String hostname; final int port; System.out.println("use command as: "); System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx"); System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka" + "--hostname xxx.xxx.xxx.xxx --port xxx"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line"); return; } EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); //Perform processing based on EventTime. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); //Use Stream1 to read data from Kafka. DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String s) throws Exception { String[] word = s.split(","); return new Tuple3<>(word[0], word[1], word[2]); } }); //Register Stream1 as Table1. tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime"); //Use Stream2 to read data from the socket. DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n"). map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { String[] words = s.split("\\s"); if (words.length < 2) { return new Tuple2<>(); } return new Tuple2<>(words[0], words[1]); } }); //Register Stream2 as Table2. tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime"); //Run SQL Join to perform a combined query. Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND"); //Convert the query result into the stream and print the output. tableEnv.toAppendStream(result, Row.class).print(); env.execute(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot