Creating a FlinkServer Job to Interconnect with a Doris Table
This section applies to MRS 3.5.0 and later.
Scenario
This topic describes how to use FlinkServer to write Kafka data to Doris and how to perform Lookup Join on Doris and Kafka data.
Prerequisites
- Services such as Doris, HDFS, YARN, Flink, and Kafka have been installed in the cluster.
- The node to be connected to the Doris database can communicate with the MRS cluster.
- A user with the Doris management permission has been created.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
On FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permission, and bind the role to the user.
Log in to FusionInsight Manager as the new user dorisuser and change the initial password.
- Kerberos authentication is disabled for the cluster (the cluster is in normal mode)
After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.
- Kerberos authentication is enabled for the cluster (the cluster is in security mode)
- The MySQL client has been installed. For details, see Using the MySQL Client to Connect to Doris.
- The Flink client has been installed.
Using FlinkServer to Write Kafka Data to Doris
Operations on the Doris side
- Log in to the node where MySQL is installed and run the following command to connect to the Doris database:
If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -uDatabase login user -p -PConnection port for FE queries -hIP address of the Doris FE instance
Enter the password for logging in to the database.
- To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
- To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of balancer_tcp_port of the Doris service.
- To obtain the IP address of the Doris FE or DBalancer instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the service IP address of any FE or DBalancer instance.
- You can also use the MySQL connection software or Doris web UI to connect to the database.
- Run the following commands to create a database and table usertable2 to which data is written:
create database sink;
use sink;
create table usertable2(
`user_id` VARCHAR(10),
`user_name` VARCHAR(10),
`age` INT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;
Operations on the Flink side
- Log in to FusionInsight Manager as a user with FlinkServer Admin Privilege. Choose Cluster > Services > Flink.
If your MRS cluster requires Kerberos authentication, create a role with the FlinkServer administrator permission or the application viewing and editing permission, and bind the role to the user. Then, you can access the Flink Web UI. For details about how to create a role, see Creating a FlinkServer Role.
- On the right of Flink web UI, click the link to access FlinkServer.
- On the FlinkServer page, choose Job Management > Create Job. On the displayed dialog box, set the following parameters and click OK. The Job Management page is displayed.
- Type: Select Flink SQL.
- Name: Enter a job name, for example, FlinkSQL1.
- Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:
- Create a Kafka data source table.
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'kafka',
'topic' ='Topic name',
'properties.bootstrap.servers' ='Service IP address of the Kafka Broker instance:Port of the Broker instance',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.
- The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
- Create a Doris Sink table:
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'doris',
'fenodes' ='IP address of any FE instance:HTTPS port or HTTP port',
'table.identifier' = 'sink.usertable2',
'username' = 'user',
'password' = 'password',
'doris.enable.https' = 'true',
'doris.ignore.https.ca' = 'true',
'sink.label-prefix' = 'doris_label1'
);
- To view the IP addresses of FE instances, choose Cluster > Services > Doris > Instances on FusionInsight Manager.
- To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port. To view the HTTP port, search for http_port.
- table.identifier: The parameter value is the Doris database and table created in 2.
- username and password are the username and password for connecting to the Doris.
- After HTTPS is disabled for a cluster in normal or security mode, remove the following configuration parameters from the with clause of the Doris Sink table creation statement:
'doris.enable.https' = 'true'
'doris.ignore.https.ca' = 'true'
- When creating a Doris Sink table, you can also set the parameters listed in Table 1.
- Run the following command to write Kafka data to Doris:
- Create a Kafka data source table.
- In the basic parameters area on the right of the Job Management page, select Enable CheckPoint and set Time Interval(ms) as required. The recommended interval ranges from 30000 to 60000.
- Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.
Operations on the Kafka side
- Log in to the node where the Kafka client is installed and perform the following operations to create a Kafka topic:
cd Client installation directory/Kafka/kafka/bin
- Run the following command to create a Kafka topic. The topic name must be the same as that configured in 6.a:
sh kafka-topics.sh --create --topic Topic name --partitions 1 --replication-factor 1 --bootstrap-server IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties
- Run the following command to view the topic list:
sh kafka-topics.sh --list --bootstrap-server KafkaIP address of the host where the Controller of the Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties
- Run the following command to connect to the Kafka client:
sh kafka-console-producer.sh --broker-list IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --topic TopicTest --producer.config ../config/producer.properties
To obtain the IP address of the host where the Controller of the Broker instance is deployed, log in to FusionInsight Manager, choose Cluster > Services > Kafka, and view the value of Controller Host in the basic information area on the Dashboard page.
- Run the following command to create a Kafka topic. The topic name must be the same as that configured in 6.a:
- Connect to Doris on the node where the MySQL client is installed and run the following command to check whether the data in the Doris table is the same as the data inserted in 9.c:
select * from sink.usertable2;
Lookup Join
- Log in to FusionInsight Manager as a user with FlinkServer Admin Privilege. Choose Cluster > Services > Flink.
If your MRS cluster requires Kerberos authentication, create a role with the FlinkServer administrator permission or the application viewing and editing permission, and bind the role to the user. Then, you can access the Flink Web UI. For details about how to create a role, see Creating a FlinkServer Role.
- On the right of Flink web UI, click the link to access FlinkServer.
- On the Flink web UI, click Job Management and then Create Job. In the Create Job dialog box, set the following parameters and click OK:
- Type: Select Flink SQL.
- Name: Enter a job name, for example, FlinkSQL2.
- Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:
- Create a Kafka Source table.
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
'topic' ='Topic name',
'properties.bootstrap.servers' ='IP address of the Kafka Broker instance:21007',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.
- The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
- properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).
- Create a Flink table.
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' ='IP address of an FE instance:HTTPS port or HTTP port',
'jdbc-url' = 'jdbc:mysql://IP address of an FE instance:FE query connection port',
'table.identifier' = 'dim.dim_city',
'username' = 'user',
'password' = 'password'
);
- To view the IP addresses of FE instances, log in to FusionInsight Manager and choose Cluster > Services > Doris > Instances.
- To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port. To view the HTTP port, search for http_port.
- To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
- When creating a Flink table, you can also set the parameters listed in Table 2.
- Run the following command to perform a join:
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
- Create a Kafka Source table.
- In the basic parameters area on the right of the Job Management page, select Enable CheckPoint and set Time Interval(ms) as required. The recommended interval ranges from 30000 to 60000.
- Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.
Table Creation Configurations
Parameter |
Default Value |
Mandatory |
Description |
---|---|---|---|
doris.request.retries |
3 |
No |
Number of retry times for sending requests to Doris. |
doris.request.connect.timeout.ms |
30000 |
No |
Connection timeout interval for sending requests to Doris. |
doris.request.read.timeout.ms |
30000 |
No |
Read timeout interval for sending requests to Doris. |
sink.max-retries |
3 |
No |
Maximum number of retry times after a commit is failed. The default value is 3. |
sink.enable.batch-mode |
false |
No |
Whether to write data to Doris in batch mode. If this is enabled, the write time does not depend on checkpoints. You can use sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, or sink.buffer-flush.interval to control the write time. |
sink.buffer-flush.max-rows |
50000 |
No |
Maximum number of data rows that can be written in a batch. |
sink.buffer-flush.max-bytes |
10MB |
No |
Maximum number of bytes that can be written in a batch. |
sink.buffer-flush.interval |
10s |
No |
Interval for asynchronously refreshing the cache in batches |
Parameter |
Default Value |
Mandatory |
Description |
---|---|---|---|
lookup.cache.max-rows |
-1 |
No |
Maximum number of rows that can be cached in the lookup table. The default value is -1, indicating that the cache function is disabled. |
lookup.cache.ttl |
10s |
No |
Maximum duration for caching lookup data. The default value is 10s. |
lookup.max-retries |
1 |
No |
Number of retry times after the lookup query fails |
lookup.jdbc.async |
false |
No |
Whether to enable asynchronous lookup. The default value is false. |
lookup.jdbc.read.batch.size |
128 |
No |
Maximum batch size for each query when asynchronous lookup is enabled |
lookup.jdbc.read.batch.queue-size |
256 |
No |
Size of the intermediate buffer queue when asynchronous lookup is enabled |
lookup.jdbc.read.thread-size |
3 |
No |
Number of Lookup JDBC threads in each task |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot