Creating a FlinkServer Job to Interconnect with JDBC
Scenarios
FlinkServer can interconnect with JDBC. This section uses FlinkServer and Kafka in security mode as an example to describe the DDL definition for JDBC MySQL source tables, sink tables, and dimension tables. This section also explains the WITH parameters for creating a table and code examples. You will learn how to interconnect with JDBC on the FlinkServer job management page.

The password field in Flink SQL statements is left blank when displayed in the command output on the FlinkServer web UI. Before submitting a job, you must manually re-enter the password.
Mapping between Flink SQL and JDBC data types
Flink SQL |
MySQL |
Oracle |
PostgreSQL |
SQL Server |
---|---|---|---|---|
BOOLEAN |
BOOLEAN TINYINT(1) |
- |
BOOLEAN |
BIT |
TINYINT |
TINYINT |
- |
- |
TINYINT |
SMALLINT |
SMALLINT TINYINT UNSIGNED |
- |
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT |
INT |
INT MEDIUMINT SMALLINT UNSIGNED |
- |
INTEGER SERIAL |
INT |
BIGINT |
BIGINT INT UNSIGNED |
- |
BIGINT BIGSERIAL |
BIGINT |
FLOAT |
FLOAT |
BINARY_FLOAT |
REAL FLOAT4 |
REAL |
DOUBLE |
DOUBLE DOUBLE PRECISION |
BINARY_DOUBLE |
FLOAT8 DOUBLE PRECISION |
FLOAT |
STRING |
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) VARCHAR(n) CLOB |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
CHAR(n) NCHAR(n) VARCHAR(n) NVARCHAR(n) TEXT NTEXT |
BYTES |
BINARY VARBINARY BLOB |
RAW(s) BLOB |
BYTEA |
BINARY(n) VARBINARY(n) |
ARRAY |
- |
- |
ARRAY |
- |
DATE |
DATE |
DATE |
DATE |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME(0) |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME DATETIME2 |
DECIMAL(20, 0) |
BIGINT UNSIGNED |
- |
- |
- |
DECIMAL(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
SMALLINT FLOAT(s) DOUBLE PRECISION REAL NUMBER(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
DECIMAL(p, s) |
Notes and Constraints
This section applies only to MRS 3.3.1 or later.
Prerequisites
- HDFS, YARN, Kafka, ZooKeeper, and Flink have been installed.
- The client that contains the Kafka service has been installed, for example, in the /opt/client directory.
- You have created a user (for example, flinkuser) with FlinkServer administrator permissions for accessing the Flink web UI. For details, see Creating a FlinkServer Role.
JDBC Sink Table (MySQL as an Example)
- Create an empty table for receiving data in a database, for example, MySQL database customer_t1.
- Log in to FusionInsight Manager as user flinkuser and choose Cluster > Services > Flink. In the Basic Information area, click the link next to Flink Web UI to access the Flink web UI.
- Create a Flink SQL stream job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.
In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.
CREATE TABLE MyUserTable( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP address of the MySQL server:MySQL server port/mysql', 'table-name' = 'customer_t1', 'username' = 'MySQL database username', 'password' = 'Password of the MySQL database user' ); CREATE TABLE KafkaSource ( c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_source', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.security.protocol' = 'SASL_PLAINTEXT', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name' --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. ); Insert into MyUserTable select * from KafkaSource;
- The IP address and port number of the Kafka Broker instance are as follows:
- To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
- If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
- If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.
- System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
- properties.group.id: indicates the Kafka user group ID. This parameter is mandatory when Kafka functions as the source.
- properties.kerberos.domain.name: Set it to hadoop.System domain name. You can log in to FusionInsight Manager and choose System > Permission > Domain and Mutual Trust to view the actual domain name of the cluster.
- The IP address and port number of the Kafka Broker instance are as follows:
- On the job management page, check whether the job status is Running.
- Check the topic and write data to Kafka. For details, see Managing Messages in Kafka Topics.
Check the Kafka topic.
./kafka-topics.sh --list --bootstrap-server Service IP address of the Kafka Broker instance:Kafka port --command-config Client directory/Kafka/kafka/config/client.properties
Write data to Kafka.
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka Broker instance is deployed:Kafka port --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties
In this example, the topic name is customer_source.
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka Broker instance is deployed:Kafka port --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
Enter the message content.
3,zhangsan 4,wangwu 8,zhaosi
Press Enter to send the message.
- Log in to the MySQL client and run the following statement to check whether the sink table received data:
Select * from customer_t1;
JDBC Source Table (MySQL as an Example)
- Log in to FusionInsight Manager as user flinkuser and choose Cluster > Services > Flink. In the Basic Information area, click the link next to Flink Web UI to access the Flink web UI.
- Create a Flink SQL stream job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.
In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 5000, and retain the default value for Mode.
CREATE TABLE MyUserTable( --MySQL source table c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP address of the MySQL server:MySQL server port/mysql', 'table-name' = 'customer_t1', 'username' = 'MySQL database username', 'password' = 'Password of the MySQL database user' ); CREATE TABLE KafkaSink ( -- Kafka functions as a sink table. c_customer_sk INTEGER, c_customer_name VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_sink', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.security.protocol' = 'SASL_PLAINTEXT', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name' --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. ); Insert into KafkaSink select * from MyUserTable;
- The IP address and port number of the Kafka Broker instance are as follows:
- To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
- If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
- If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.
- System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
- properties.group.id: indicates the Kafka user group ID. This parameter is mandatory when Kafka functions as the source.
- properties.kerberos.domain.name: Set it to hadoop.System domain name. You can log in to FusionInsight Manager and choose System > Permission > Domain and Mutual Trust to view the actual domain name of the cluster.
- The IP address and port number of the Kafka Broker instance are as follows:
- On the job management page, check whether the job status is Running.
- Obtain the required Kafka IP address and port by referring to Managing Messages in Kafka Topics, and run the following command to check whether data is written from the Kafka topic to the sink table:
sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server IP address of the node where the Kafka instance is deployed:Kafka port --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties
JDBC Dimension Table (MySQL as an Example)
KafkaSource is used as the fact table, customer_t2 is used as the dimension table, and the result is written to kafkaSink.
- Create the dimension table customer_t2 on the MySQL client. An example of the table creation statement is as follows:
CREATE TABLE customer_t2( c_customer_sk INTEGER PRIMARY KEY, c_customer_age INTEGER, c_customer_address VARCHAR(32) ); INSERT INTO customer_t2 VALUES(1,18,'city a'); INSERT INTO customer_t2 VALUES(2,14,'city b'); INSERT INTO customer_t2 VALUES(3,16,'city c'); INSERT INTO customer_t2 VALUES(4,24,'city d'); INSERT INTO customer_t2 VALUES(5,32,'city e'); INSERT INTO customer_t2 VALUES(6,27,'city f'); INSERT INTO customer_t2 VALUES(7,41,'city a'); INSERT INTO customer_t2 VALUES(8,35,'city h'); INSERT INTO customer_t2 VALUES(9,16,'city j');
- Log in to FusionInsight Manager as user flinkuser and choose Cluster > Services > Flink. In the Basic Information area, click the link next to Flink Web UI to access the Flink web UI.
- Create a Flink SQL stream job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.
In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 5000, and retain the default value for Mode.
CREATE TABLE KafkaSource ( -- Kafka as a source table c_customer_sk INTEGER, c_customer_name VARCHAR(32), proctime as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_source', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.security.protocol' = 'SASL_PLAINTEXT', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name' --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. ); CREATE TABLE KafkaSink ( -- Kafka functions as a sink table. c_customer_sk INTEGER, c_customer_name VARCHAR(32), c_customer_age INTEGER, c_customer_address VARCHAR(32) ) WITH ( 'connector' = 'kafka', 'topic' = 'customer_sink', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Kafka port', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.security.protocol' = 'SASL_PLAINTEXT', --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name' --Delete this parameter if the cluster where FlinkServer is deployed is in non-security mode. ); CREATE TABLE MyUserTable ( -- MySQL functions as a dimension table c_customer_sk INTEGER PRIMARY KEY NOT ENFORCED, c_customer_age INTEGER, c_customer_address VARCHAR(32) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://IP address of the MySQL server:MySQL server port/mysql', 'table-name' = 'customer_t2', 'username' = 'MySQL database username', 'password' = 'Password of the MySQL database user' ); INSERT INTO KafkaSink SELECT t.c_customer_sk, t.c_customer_name, d.c_customer_age, d.c_customer_address FROM KafkaSource as t JOIN MyUserTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.c_customer_sk = d.c_customer_sk;
- The IP address and port number of the Kafka Broker instance are as follows:
- To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instances, and query the instance IP address on the instance list page.
- If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
- If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the Broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.
- System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
- properties.group.id: indicates the Kafka user group ID. This parameter is mandatory when Kafka functions as the source.
- properties.kerberos.domain.name: Set it to hadoop.System domain name. You can log in to FusionInsight Manager and choose System > Permission > Domain and Mutual Trust to view the actual domain name of the cluster.
- The IP address and port number of the Kafka Broker instance are as follows:
- Run the following command to check whether data is received in the sink table, that is, check whether data is properly written to the Kafka topic after 5 is performed. For details, see Managing Messages in Kafka Topics.
sh kafka-console-consumer.sh --topic customer_sink --bootstrap-server IP address of the node where the Kafka instance is deployed:Kafka port --consumer.config /opt/client/Kafka/kafka/config/ consumer.properties
- View the topic and write data to the Kafka topic by referring to Managing Messages in Kafka Topics. After the data is written, view the execution result in the window in 4.
Check the Kafka topic.
./kafka-topics.sh --list Service IP address of the Kafka Broker instance:Kafka port--command-config Client directory/Kafka/kafka/config/client.properties
Write data to Kafka.
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka Broker instance is deployed:Kafka port --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties
In this example, the topic name is customer_source.
sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka Broker instance is deployed:Kafka port --topic customer_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
Enter the message content.
3,zhangsan 5,zhaosi 1,xiaoming 2,liuyang 7,liubei 10,guanyu 20,zhaoyun
Press Enter to send the message. The output in the kafka-console-consumer window in 4 is as follows:
3,zhangsan,16,city c 5,zhaosi,32,city e 1,xiaoming,18,city a 2,liuyang,14,city b 7,liubei,41,city a
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot