Creating a FlinkServer Job to Interconnect with a Doris Table
This section applies to MRS 3.3.1 or later.
Scenarios
This section describes how to use FlinkServer to write Kafka data to Doris and how to perform Lookup Join on Doris and Kafka data.
Prerequisites
- Services such as Doris, HDFS, YARN, Flink, and Kafka have been installed in the cluster.
- The node to be connected to the Doris database can communicate with the MRS cluster.
- A user with the Doris management permission has been created.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
On FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permissions, and bind the role to the user.
Log in to FusionInsight Manager as the new user dorisuser and change the initial password.
- Kerberos authentication is disabled for the cluster (the cluster is in normal mode)
After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
- The MySQL client has been installed. For details, see Using the MySQL Client to Connect to Doris.
- The Flink client has been installed.
Using FlinkServer to Write Kafka Data to Doris
Operations on the Doris side
- Log in to the node where MySQL is installed and run the following command to connect to the Doris database:
If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -uDatabase login user -p -PConnection port for FE queries -hIP address of the Doris FE instance
Enter the password for logging in to the database.
- To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
- To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of balancer_tcp_port of the Doris service.
- To obtain the IP address of the Doris FE or DBalancer instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the service IP address of any FE or DBalancer instance.
- You can also use the MySQL connection software or Doris web UI to connect to the database.
- Run the following commands to create a database and table usertable2 to which data is written:
create database sink; use sink; create table usertable2( `user_id` VARCHAR(10), `user_name` VARCHAR(10), `age` INT ) DISTRIBUTED BY HASH(user_id) BUCKETS 32;
Operations on the Flink side
- Log in to FusionInsight Manager as a user with FlinkServer administrator permissions. Choose Cluster > Services > Flink.
If Kerberos authentication has been enabled for your MRS cluster, create a role with the FlinkServer administrator permissions or the application viewing and editing permissions, and associate the role to the user. Then, you can access the Flink web UI. For details about how to create a role, see Creating a FlinkServer Role.
- On the right of Flink Web UI, click the link to access FlinkServer.
- On the FlinkServer page, choose Job Management > Create Job. In the displayed dialog box, set the following parameters and click OK. The Job Management page is displayed.
- Type: Select Flink SQL.
- Name: Enter a job name, for example, FlinkSQL1.
- Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:
- Create a Kafka data source table.
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'Topic name', 'properties.bootstrap.servers' = 'Service IP address of the Kafka Broker instance:Port of the Broker instance', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' );
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.
- To obtain the values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name, search for sasl.kerberos.service.name, security.protocol, and kerberos.domain.name in the server.properties file in the Client installation directory/Kafka/kafka/config directory on the node where the Kafka client is installed.
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
- Create a Doris Sink table.
CREATE TABLE dorisSink ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'doris', 'fenodes' = 'IP address of any FE instance:HTTPS or HTTP port', 'table.identifier' = 'sink.usertable2', 'username' = 'user', 'password' = 'password', 'doris.enable.https' = 'true', 'doris.ignore.https.ca' = 'true', 'sink.label-prefix' = 'doris_label1' );
- To view the IP addresses of FE instances, log in to FusionInsight Manager, choose Cluster > Services > Doris, and click Instances.
- To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris, click Configurations, and search for https_port. To view the HTTP port, search for http_port.
- table.identifier: The parameter value is the Doris database and table created in 2.
- username and password are the username and password for connecting to Doris.
- After HTTPS is disabled for a cluster in normal or security mode, remove the following configuration parameters from the with clause of the Doris Sink table creation statement:
'doris.enable.https' = 'true'
'doris.ignore.https.ca' = 'true'
- When creating a Doris Sink table, you can also set the parameters listed in Table 1.
- Run the following command to write Kafka data to Doris:
insert into dorisSink select * from KafkaSource;
- Create a Kafka data source table.
- Navigate to the Job Management page. In the Basic Parameter tab on the right of the page, select Enable CheckPoint and set Time Interval(ms) as you need. The recommended value range is 30000 to 60000.
- Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.
Operations on the Kafka side
- Log in to the node where the Kafka client is installed and perform the following operations to create a Kafka topic:
cd Client installation directory/Kafka/kafka/bin
- Run the following command to create a Kafka topic. The topic name must be the same as that configured in 6.a:
sh kafka-topics.sh --create --topic Topic name --partitions 1 --replication-factor 1 --bootstrap-server IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties
- Run the following command to view the topic list:
sh kafka-topics.sh --list --bootstrap-server Kafka IP address of the host where the Controller of the Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties
- Run the following command to connect to the Kafka client:
sh kafka-console-producer.sh --broker-list IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --topic TopicTest --producer.config ../config/producer.properties
- To obtain the IP address of the host where the Controller of the Broker instance is deployed, log in to FusionInsight Manager, choose Cluster > Services > Kafka, and view the value of Controller Host in the basic information area on the Dashboard page.
- To obtain the port of the Broker instance, click Configurations, and search for sasl.port for a cluster in security mode and port for a cluster in normal mode.
- Run the following command to create a Kafka topic. The topic name must be the same as that configured in 6.a:
- Connect to Doris on the node where the MySQL client is installed and run the following command to check whether the data in the Doris table is the same as the data inserted in 9.c:
select * from sink.usertable2;
Doris as a Dimension Table
- Log in to FusionInsight Manager as a user with FlinkServer administrator permissions. Choose Cluster > Services > Flink.
If Kerberos authentication has been enabled for your MRS cluster, create a role with the FlinkServer administrator permissions or the application viewing and editing permissions, and associate the role to the user. Then, you can access the Flink web UI. For details about how to create a role, see Creating a FlinkServer Role.
- On the right of Flink web UI, click the link to access FlinkServer.
- On the Flink web UI, choose Job Management > Create Job. In the displayed dialog box, set the following parameters and click OK. The Job Management page is displayed.
- Type: Select Flink SQL.
- Name: Enter a job name, for example, FlinkSQL2.
- Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:
- Create a Kafka Source table.
CREATE TABLE fact_table ( `id` BIGINT, `name` STRING, `city` STRING, `process_time` as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'Topic name', 'properties.bootstrap.servers' = 'IP address of the Kafka Broker instance::Port of the Broker instance', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' );
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.
- The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
- Create a Flink table.
create table dim_city( `city` STRING, `level` INT , `province` STRING, `country` STRING ) WITH ( 'connector' = 'doris', 'fenodes' = 'FE instance IP address:HTTPS or HTTP port', 'jdbc-url' = 'jdbc:mysql://FE instance IP address:FE query connection port', 'table.identifier' = 'dim.dim_city', 'username' = 'user', 'password' = 'password' );
- To view the IP addresses of FE instances, log in to FusionInsight Manager, choose Cluster > Services > Doris, and click Instances.
- To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris, click Configurations, and search for https_port. To view the HTTP port, search for http_port.
- To obtain the query connection port of the Doris FE instance,log in to FusionInsight Manager, choose Cluster > Services > Doris, click Configurations, and query the value of query_port of the Doris service.
- username and password are the username and password for connecting to Doris.
- When creating a Flink table, you can also set the parameters listed in Table 2.
- Run the following command to perform a join:
SELECT a.id, a.name, a.city, c.province, c.country,c.level FROM fact_table a LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c ON a.city = c.city
- Create a Kafka Source table.
- Navigate to the Job Management page. In the Basic Parameter tab on the right of the page, select Enable CheckPoint and set Time Interval(ms) as you need. The recommended value range is 30000 to 60000.
- Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.
Table Configurations
Parameter |
Default Value |
Mandatory |
Description |
---|---|---|---|
doris.request.retries |
3 |
No |
Number of retries for sending requests to Doris. |
doris.request.connect.timeout.ms |
30000 |
No |
Connection timeout interval for sending requests to Doris. |
doris.request.read.timeout.ms |
30000 |
No |
Read timeout interval for sending requests to Doris. |
sink.max-retries |
3 |
No |
Maximum number of retries after a Commit failure. The default value is 3. |
sink.enable.batch-mode |
false |
No |
Whether to use the batch mode to write data to Doris. If this is enabled, the writing timing does not rely on checkpoints. You can use sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, or sink.buffer-flush.interval to control the writing timing. |
sink.buffer-flush.max-rows |
50000 |
No |
Maximum number of data rows written in a single batch in batch mode. |
sink.buffer-flush.max-bytes |
10MB |
No |
Maximum number of bytes written in a single batch in batch mode. |
sink.buffer-flush.interval |
10s |
No |
Interval for asynchronously flushing the cache in batch mode. |
Parameter |
Default Value |
Mandatory |
Description |
---|---|---|---|
lookup.cache.max-rows |
-1 |
No |
Maximum number of rows in the lookup cache. The default value is -1, which means the cache is not enabled. |
lookup.cache.ttl |
10s |
No |
Maximum time for the lookup cache. The default value is 10 seconds. |
lookup.max-retries |
1 |
No |
Number of retries after a lookup query fails. |
lookup.jdbc.async |
false |
No |
Whether to enable asynchronous lookup. The default value is false. |
lookup.jdbc.read.batch.size |
128 |
No |
Maximum batch size for each query in asynchronous lookup. |
lookup.jdbc.read.batch.queue-size |
256 |
No |
Size of the intermediate buffer queue during asynchronous lookup. |
lookup.jdbc.read.thread-size |
3 |
No |
Number of JDBC threads for lookup in each task. |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot