Java Sample Code
In the Flink application, this code invokes the flink-connector-kafka module's API to generate and consume data.
Sample Code
If the user needs to use FusionInsight Kafka interconnected with the security mode before the development, obtain the kafka-clients-*.jar JAR file from the Kafka client directory.
The following lists Producer, Consumer, and the main logic code used by Flink Stream SQL Join.
For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafkaand com.huawei.bigdata.flink.examples.SqlJoinWithSocket.
- A piece of user information is generated in Kafka every second. The user information includes the name, age, and gender.
//Producer code public class WriteIntoKafka { public static void main(String[] args) throws Exception { //Print the command reference for flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers"); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers --security.protocol SASL_PLAINTEXT kafka"); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei"); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers --security.protocol SASL_SSL kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); //Construct the execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set the concurrency. env.setParallelism(1); //Parse the running parameters. ParameterTool paraTool = ParameterTool.fromArgs(args); //Construct a flow diagram and write the data generated from self-defined sources to Kafka. DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties()); producer.setWriteTimestampToKafka(true); messageStream.addSink(producer); //Invoke execute to trigger the execution. env.execute(); } //Customize the sources and generate a message every second. public static class SimpleStringGenerator implements SourceFunction<String> { static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"}; static final String[] SEX = {"MALE", "FEMALE"}; static final int COUNT = NAME.length; boolean running = true; Random rand = new Random(47); @Override //Use rand to randomly generate a combination of the name, gender, and age. public void run(SourceContext<String> ctx) throws Exception { while (running) { int i = rand.nextInt(COUNT); int age = rand.nextInt(70); String sexy = SEX[rand.nextInt(2)]; ctx.collect(NAME[i] + "," + age + "," + sexy); thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
- Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
public class SqlJoinWithSocket { public static void main(String[] args) throws Exception{ final String hostname; final int port; System.out.println("use command as: "); System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers --hostname --port xxx"); System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers --security.protocol SASL_PLAINTEXT kafka" + "--hostname --port xxx"); System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers --security.protocol SSL --ssl.truststore.location /home/truststore.jks " + "--ssl.truststore.password huawei --hostname --port xxx"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line"); return; } EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); //Perform processing based on EventTime. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); //Use Stream1 to read data from Kafka. DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String s) throws Exception { String[] word = s.split(","); return new Tuple3<>(word[0], word[1], word[2]); } }); //Register Stream1 as Table1. tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime"); //Use Stream2 to read data from the socket. DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n"). map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { String[] words = s.split("\\s"); if (words.length < 2) { return new Tuple2<>(); } return new Tuple2<>(words[0], words[1]); } }); //Register Stream2 as Table2. tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime"); //Run SQL Join to perform a combined query. Table result = tableEnv.sqlQuery("SELECT, t1.age,, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON =\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND"); //Convert the query result into the stream and print the output. tableEnv.toAppendStream(result, Row.class).print(); env.execute(); } }
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.