Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Creating a FlinkServer Job/ Creating a FlinkServer Job to Interconnect with a Doris Table
Updated on 2024-12-13 GMT+08:00

Creating a FlinkServer Job to Interconnect with a Doris Table

This section applies to MRS 3.5.0 and later.

Scenario

This topic describes how to use FlinkServer to write Kafka data to Doris and how to perform Lookup Join on Doris and Kafka data.

Prerequisites

  • Services such as Doris, HDFS, YARN, Flink, and Kafka have been installed in the cluster.
  • The node to be connected to the Doris database can communicate with the MRS cluster.
  • A user with the Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      On FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permission, and bind the role to the user.

      Log in to FusionInsight Manager as the new user dorisuser and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Using the MySQL Client to Connect to Doris.
  • The Flink client has been installed.

Using FlinkServer to Write Kafka Data to Doris

Operations on the Doris side

  1. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

    If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -uDatabase login user -p -PConnection port for FE queries -hIP address of the Doris FE instance

    Enter the password for logging in to the database.

    • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
    • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of balancer_tcp_port of the Doris service.
    • To obtain the IP address of the Doris FE or DBalancer instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the service IP address of any FE or DBalancer instance.
    • You can also use the MySQL connection software or Doris web UI to connect to the database.

  2. Run the following commands to create a database and table usertable2 to which data is written:

    create database sink;

    use sink;

    create table usertable2(

    `user_id` VARCHAR(10),

    `user_name` VARCHAR(10),

    `age` INT

    )

    DISTRIBUTED BY HASH(user_id) BUCKETS 32;

Operations on the Flink side

  1. Log in to FusionInsight Manager as a user with FlinkServer Admin Privilege. Choose Cluster > Services > Flink.

    If your MRS cluster requires Kerberos authentication, create a role with the FlinkServer administrator permission or the application viewing and editing permission, and bind the role to the user. Then, you can access the Flink Web UI. For details about how to create a role, see Creating a FlinkServer Role.

  2. On the right of Flink web UI, click the link to access FlinkServer.
  3. On the FlinkServer page, choose Job Management > Create Job. On the displayed dialog box, set the following parameters and click OK. The Job Management page is displayed.

    • Type: Select Flink SQL.
    • Name: Enter a job name, for example, FlinkSQL1.

  4. Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:

    1. Create a Kafka data source table.

      CREATE TABLE KafkaSource (

      `user_id` VARCHAR,

      `user_name` VARCHAR,

      `age` INT

      ) WITH (

      'connector' = 'kafka',

      'topic' ='Topic name',

      'properties.bootstrap.servers' ='Service IP address of the Kafka Broker instance:Port of the Broker instance',

      'properties.group.id' = 'testGroup',

      'scan.startup.mode' = 'latest-offset',

      'format' = 'csv',

      'properties.sasl.kerberos.service.name' = 'kafka',

      'properties.security.protocol' = 'SASL_PLAINTEXT',

      'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

      );

      • properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).

        To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.

      • The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
    2. Create a Doris Sink table:

      CREATE TABLE dorisSink (

      `user_id` VARCHAR,

      `user_name` VARCHAR,

      `age` INT

      ) WITH (

      'connector' = 'doris',

      'fenodes' ='IP address of any FE instance:HTTPS port or HTTP port',

      'table.identifier' = 'sink.usertable2',

      'username' = 'user',

      'password' = 'password',

      'doris.enable.https' = 'true',

      'doris.ignore.https.ca' = 'true',

      'sink.label-prefix' = 'doris_label1'

      );

      • To view the IP addresses of FE instances, choose Cluster > Services > Doris > Instances on FusionInsight Manager.
      • To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port. To view the HTTP port, search for http_port.
      • table.identifier: The parameter value is the Doris database and table created in 2.
      • username and password are the username and password for connecting to the Doris.
      • After HTTPS is disabled for a cluster in normal or security mode, remove the following configuration parameters from the with clause of the Doris Sink table creation statement:

        'doris.enable.https' = 'true'

        'doris.ignore.https.ca' = 'true'

      • When creating a Doris Sink table, you can also set the parameters listed in Table 1.
    3. Run the following command to write Kafka data to Doris:

      insert into dorisSink select * from KafkaSource;

  5. In the basic parameters area on the right of the Job Management page, select Enable CheckPoint and set Time Interval(ms) as required. The recommended interval ranges from 30000 to 60000.
  6. Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.

Operations on the Kafka side

  1. Log in to the node where the Kafka client is installed and perform the following operations to create a Kafka topic:

    cd Client installation directory/Kafka/kafka/bin

    1. Run the following command to create a Kafka topic. The topic name must be the same as that configured in 6.a:

      sh kafka-topics.sh --create --topic Topic name --partitions 1 --replication-factor 1 --bootstrap-server IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties

    2. Run the following command to view the topic list:

      sh kafka-topics.sh --list --bootstrap-server KafkaIP address of the host where the Controller of the Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties

    3. Run the following command to connect to the Kafka client:

      sh kafka-console-producer.sh --broker-list IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --topic TopicTest --producer.config ../config/producer.properties

    To obtain the IP address of the host where the Controller of the Broker instance is deployed, log in to FusionInsight Manager, choose Cluster > Services > Kafka, and view the value of Controller Host in the basic information area on the Dashboard page.

  2. Connect to Doris on the node where the MySQL client is installed and run the following command to check whether the data in the Doris table is the same as the data inserted in 9.c:

    select * from sink.usertable2;

Lookup Join

  1. Log in to FusionInsight Manager as a user with FlinkServer Admin Privilege. Choose Cluster > Services > Flink.

    If your MRS cluster requires Kerberos authentication, create a role with the FlinkServer administrator permission or the application viewing and editing permission, and bind the role to the user. Then, you can access the Flink Web UI. For details about how to create a role, see Creating a FlinkServer Role.

  1. On the right of Flink web UI, click the link to access FlinkServer.
  2. On the Flink web UI, click Job Management and then Create Job. In the Create Job dialog box, set the following parameters and click OK:

    • Type: Select Flink SQL.
    • Name: Enter a job name, for example, FlinkSQL2.

  3. Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:

    1. Create a Kafka Source table.

      CREATE TABLE fact_table (

      `id` BIGINT,

      `name` STRING,

      `city` STRING,

      `process_time` as proctime()

      ) WITH (

      'connector' = 'kafka',

      'topic' ='Topic name',

      'properties.bootstrap.servers' ='IP address of the Kafka Broker instance:21007',

      'properties.group.id' = 'testGroup',

      'scan.startup.mode' = 'latest-offset',

      'format' = 'csv',

      'properties.sasl.kerberos.service.name' = 'kafka',

      'properties.security.protocol' = 'SASL_PLAINTEXT',

      'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

      );

      • properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).

        To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.

      • The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
    2. Create a Flink table.

      create table dim_city(

      `city` STRING,

      `level` INT ,

      `province` STRING,

      `country` STRING

      ) WITH (

      'connector' = 'doris',

      'fenodes' ='IP address of an FE instance:HTTPS port or HTTP port',

      'jdbc-url' = 'jdbc:mysql://IP address of an FE instance:FE query connection port',

      'table.identifier' = 'dim.dim_city',

      'username' = 'user',

      'password' = 'password'

      );

      • To view the IP addresses of FE instances, log in to FusionInsight Manager and choose Cluster > Services > Doris > Instances.
      • To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port. To view the HTTP port, search for http_port.
      • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • When creating a Flink table, you can also set the parameters listed in Table 2.
    3. Run the following command to perform a join:

      SELECT a.id, a.name, a.city, c.province, c.country,c.level

      FROM fact_table a

      LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c

      ON a.city = c.city

  4. In the basic parameters area on the right of the Job Management page, select Enable CheckPoint and set Time Interval(ms) as required. The recommended interval ranges from 30000 to 60000.
  5. Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.

Table Creation Configurations

Table 1 Optional parameters for creating a Doris Sink table

Parameter

Default Value

Mandatory

Description

doris.request.retries

3

No

Number of retry times for sending requests to Doris.

doris.request.connect.timeout.ms

30000

No

Connection timeout interval for sending requests to Doris.

doris.request.read.timeout.ms

30000

No

Read timeout interval for sending requests to Doris.

sink.max-retries

3

No

Maximum number of retry times after a commit is failed. The default value is 3.

sink.enable.batch-mode

false

No

Whether to write data to Doris in batch mode. If this is enabled, the write time does not depend on checkpoints. You can use sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, or sink.buffer-flush.interval to control the write time.

sink.buffer-flush.max-rows

50000

No

Maximum number of data rows that can be written in a batch.

sink.buffer-flush.max-bytes

10MB

No

Maximum number of bytes that can be written in a batch.

sink.buffer-flush.interval

10s

No

Interval for asynchronously refreshing the cache in batches

Table 2 Optional parameters for creating a Flink table in Lookup Joins

Parameter

Default Value

Mandatory

Description

lookup.cache.max-rows

-1

No

Maximum number of rows that can be cached in the lookup table. The default value is -1, indicating that the cache function is disabled.

lookup.cache.ttl

10s

No

Maximum duration for caching lookup data. The default value is 10s.

lookup.max-retries

1

No

Number of retry times after the lookup query fails

lookup.jdbc.async

false

No

Whether to enable asynchronous lookup. The default value is false.

lookup.jdbc.read.batch.size

128

No

Maximum batch size for each query when asynchronous lookup is enabled

lookup.jdbc.read.batch.queue-size

256

No

Size of the intermediate buffer queue when asynchronous lookup is enabled

lookup.jdbc.read.thread-size

3

No

Number of Lookup JDBC threads in each task