Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Creating a FlinkServer Job/ Creating a FlinkServer Job to Interconnect with a Doris Table

Creating a FlinkServer Job to Interconnect with a Doris Table

Updated on 2024-12-13 GMT+08:00

This section applies to MRS 3.5.0 and later.

Scenario

This topic describes how to use FlinkServer to write Kafka data to Doris and how to perform Lookup Join on Doris and Kafka data.

Prerequisites

  • Services such as Doris, HDFS, YARN, Flink, and Kafka have been installed in the cluster.
  • The node to be connected to the Doris database can communicate with the MRS cluster.
  • A user with the Doris management permission has been created.
    • Kerberos authentication is enabled for the cluster (the cluster is in security mode)

      On FusionInsight Manager, create a human-machine user, for example, dorisuser, create a role with Doris administrator permission, and bind the role to the user.

      Log in to FusionInsight Manager as the new user dorisuser and change the initial password.

    • Kerberos authentication is disabled for the cluster (the cluster is in normal mode)

      After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.

  • The MySQL client has been installed. For details, see Using the MySQL Client to Connect to Doris.
  • The Flink client has been installed.

Using FlinkServer to Write Kafka Data to Doris

Operations on the Doris side

  1. Log in to the node where MySQL is installed and run the following command to connect to the Doris database:

    If Kerberos authentication is enabled for the cluster (the cluster is in security mode), run the following command to connect to the Doris database:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -uDatabase login user -p -PConnection port for FE queries -hIP address of the Doris FE instance

    Enter the password for logging in to the database.

    NOTE:
    • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
    • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of balancer_tcp_port of the Doris service.
    • To obtain the IP address of the Doris FE or DBalancer instance, log in to FusionInsight Manager of the MRS cluster and choose Cluster > Services > Doris > Instances to view the service IP address of any FE or DBalancer instance.
    • You can also use the MySQL connection software or Doris web UI to connect to the database.

  2. Run the following commands to create a database and table usertable2 to which data is written:

    create database sink;

    use sink;

    create table usertable2(

    `user_id` VARCHAR(10),

    `user_name` VARCHAR(10),

    `age` INT

    )

    DISTRIBUTED BY HASH(user_id) BUCKETS 32;

Operations on the Flink side

  1. Log in to FusionInsight Manager as a user with FlinkServer Admin Privilege. Choose Cluster > Services > Flink.

    NOTE:

    If your MRS cluster requires Kerberos authentication, create a role with the FlinkServer administrator permission or the application viewing and editing permission, and bind the role to the user. Then, you can access the Flink Web UI. For details about how to create a role, see Creating a FlinkServer Role.

  2. On the right of Flink web UI, click the link to access FlinkServer.
  3. On the FlinkServer page, choose Job Management > Create Job. On the displayed dialog box, set the following parameters and click OK. The Job Management page is displayed.

    • Type: Select Flink SQL.
    • Name: Enter a job name, for example, FlinkSQL1.

  4. Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:

    1. Create a Kafka data source table.

      CREATE TABLE KafkaSource (

      `user_id` VARCHAR,

      `user_name` VARCHAR,

      `age` INT

      ) WITH (

      'connector' = 'kafka',

      'topic' ='Topic name',

      'properties.bootstrap.servers' ='Service IP address of the Kafka Broker instance:Port of the Broker instance',

      'properties.group.id' = 'testGroup',

      'scan.startup.mode' = 'latest-offset',

      'format' = 'csv',

      'properties.sasl.kerberos.service.name' = 'kafka',

      'properties.security.protocol' = 'SASL_PLAINTEXT',

      'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

      );

      NOTE:
      • properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).

        To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.

      • The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
    2. Create a Doris Sink table:

      CREATE TABLE dorisSink (

      `user_id` VARCHAR,

      `user_name` VARCHAR,

      `age` INT

      ) WITH (

      'connector' = 'doris',

      'fenodes' ='IP address of any FE instance:HTTPS port or HTTP port',

      'table.identifier' = 'sink.usertable2',

      'username' = 'user',

      'password' = 'password',

      'doris.enable.https' = 'true',

      'doris.ignore.https.ca' = 'true',

      'sink.label-prefix' = 'doris_label1'

      );

      NOTE:
      • To view the IP addresses of FE instances, choose Cluster > Services > Doris > Instances on FusionInsight Manager.
      • To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port. To view the HTTP port, search for http_port.
      • table.identifier: The parameter value is the Doris database and table created in 2.
      • username and password are the username and password for connecting to the Doris.
      • After HTTPS is disabled for a cluster in normal or security mode, remove the following configuration parameters from the with clause of the Doris Sink table creation statement:

        'doris.enable.https' = 'true'

        'doris.ignore.https.ca' = 'true'

      • When creating a Doris Sink table, you can also set the parameters listed in Table 1.
    3. Run the following command to write Kafka data to Doris:

      insert into dorisSink select * from KafkaSource;

  5. In the basic parameters area on the right of the Job Management page, select Enable CheckPoint and set Time Interval(ms) as required. The recommended interval ranges from 30000 to 60000.
  6. Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.

Operations on the Kafka side

  1. Log in to the node where the Kafka client is installed and perform the following operations to create a Kafka topic:

    cd Client installation directory/Kafka/kafka/bin

    1. Run the following command to create a Kafka topic. The topic name must be the same as that configured in 6.a:

      sh kafka-topics.sh --create --topic Topic name --partitions 1 --replication-factor 1 --bootstrap-server IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties

    2. Run the following command to view the topic list:

      sh kafka-topics.sh --list --bootstrap-server KafkaIP address of the host where the Controller of the Broker instance is deployed:Port of the Broker instance --command-config ../config/client.properties

    3. Run the following command to connect to the Kafka client:

      sh kafka-console-producer.sh --broker-list IP address of the host where the Controller of the Kafka Broker instance is deployed:Port of the Broker instance --topic TopicTest --producer.config ../config/producer.properties

    NOTE:

    To obtain the IP address of the host where the Controller of the Broker instance is deployed, log in to FusionInsight Manager, choose Cluster > Services > Kafka, and view the value of Controller Host in the basic information area on the Dashboard page.

  2. Connect to Doris on the node where the MySQL client is installed and run the following command to check whether the data in the Doris table is the same as the data inserted in 9.c:

    select * from sink.usertable2;

Lookup Join

  1. Log in to FusionInsight Manager as a user with FlinkServer Admin Privilege. Choose Cluster > Services > Flink.

    NOTE:

    If your MRS cluster requires Kerberos authentication, create a role with the FlinkServer administrator permission or the application viewing and editing permission, and bind the role to the user. Then, you can access the Flink Web UI. For details about how to create a role, see Creating a FlinkServer Role.

  1. On the right of Flink web UI, click the link to access FlinkServer.
  2. On the Flink web UI, click Job Management and then Create Job. In the Create Job dialog box, set the following parameters and click OK:

    • Type: Select Flink SQL.
    • Name: Enter a job name, for example, FlinkSQL2.

  3. Create a stream or batch Flink SQL job on the Flink job management page. The following are some examples:

    1. Create a Kafka Source table.

      CREATE TABLE fact_table (

      `id` BIGINT,

      `name` STRING,

      `city` STRING,

      `process_time` as proctime()

      ) WITH (

      'connector' = 'kafka',

      'topic' ='Topic name',

      'properties.bootstrap.servers' ='IP address of the Kafka Broker instance:21007',

      'properties.group.id' = 'testGroup',

      'scan.startup.mode' = 'latest-offset',

      'format' = 'csv',

      'properties.sasl.kerberos.service.name' = 'kafka',

      'properties.security.protocol' = 'SASL_PLAINTEXT',

      'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

      );

      NOTE:
      • properties.bootstrap.servers: If there are multiple parameter values, separate them with commas (,).

        To view the service IP address of the Kafka Broker instance, log in to FusionInsight Manager and choose Cluster > Services > Kafka > Instances. To view the port of the Broker instance, click Configurations. If Kerberos authentication is enabled for the cluster (the cluster is in security mode), search for sasl.port. If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), search for port.

      • The values of properties.sasl.kerberos.service.name, properties.security.protocol, and properties.kerberos.domain.name can be obtained from Client installation directory/Kafka/kafka/config on the node where the Kafka client is installed. Search for sasl.kerberos.service.name, security.protocol, or kerberos.domain.name in the server.properties file in the directory.
    2. Create a Flink table.

      create table dim_city(

      `city` STRING,

      `level` INT ,

      `province` STRING,

      `country` STRING

      ) WITH (

      'connector' = 'doris',

      'fenodes' ='IP address of an FE instance:HTTPS port or HTTP port',

      'jdbc-url' = 'jdbc:mysql://IP address of an FE instance:FE query connection port',

      'table.identifier' = 'dim.dim_city',

      'username' = 'user',

      'password' = 'password'

      );

      NOTE:
      • To view the IP addresses of FE instances, log in to FusionInsight Manager and choose Cluster > Services > Doris > Instances.
      • To view the HTTPS port, log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and search for https_port. To view the HTTP port, search for http_port.
      • To obtain the query connection port of the Doris FE instance, you can log in to FusionInsight Manager, choose Cluster > Services > Doris > Configurations, and query the value of query_port of the Doris service.
      • When creating a Flink table, you can also set the parameters listed in Table 2.
    3. Run the following command to perform a join:

      SELECT a.id, a.name, a.city, c.province, c.country,c.level

      FROM fact_table a

      LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c

      ON a.city = c.city

  4. In the basic parameters area on the right of the Job Management page, select Enable CheckPoint and set Time Interval(ms) as required. The recommended interval ranges from 30000 to 60000.
  5. Click Check Semantic to verify the semantics of the statements. Then, click Save and Submit.

Table Creation Configurations

Table 1 Optional parameters for creating a Doris Sink table

Parameter

Default Value

Mandatory

Description

doris.request.retries

3

No

Number of retry times for sending requests to Doris.

doris.request.connect.timeout.ms

30000

No

Connection timeout interval for sending requests to Doris.

doris.request.read.timeout.ms

30000

No

Read timeout interval for sending requests to Doris.

sink.max-retries

3

No

Maximum number of retry times after a commit is failed. The default value is 3.

sink.enable.batch-mode

false

No

Whether to write data to Doris in batch mode. If this is enabled, the write time does not depend on checkpoints. You can use sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, or sink.buffer-flush.interval to control the write time.

sink.buffer-flush.max-rows

50000

No

Maximum number of data rows that can be written in a batch.

sink.buffer-flush.max-bytes

10MB

No

Maximum number of bytes that can be written in a batch.

sink.buffer-flush.interval

10s

No

Interval for asynchronously refreshing the cache in batches

Table 2 Optional parameters for creating a Flink table in Lookup Joins

Parameter

Default Value

Mandatory

Description

lookup.cache.max-rows

-1

No

Maximum number of rows that can be cached in the lookup table. The default value is -1, indicating that the cache function is disabled.

lookup.cache.ttl

10s

No

Maximum duration for caching lookup data. The default value is 10s.

lookup.max-retries

1

No

Number of retry times after the lookup query fails

lookup.jdbc.async

false

No

Whether to enable asynchronous lookup. The default value is false.

lookup.jdbc.read.batch.size

128

No

Maximum batch size for each query when asynchronous lookup is enabled

lookup.jdbc.read.batch.queue-size

256

No

Size of the intermediate buffer queue when asynchronous lookup is enabled

lookup.jdbc.read.thread-size

3

No

Number of Lookup JDBC threads in each task

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback