Interconnecting FlinkServer with Hudi
This section applies to MRS 3.1.2 or later clusters.
Scenario
This section describes how to interconnect FlinkServer with Hudi through Flink SQL jobs.
Prerequisites
- The HDFS, Yarn, Flink, and Hudi services have been installed in a cluster.
- The client that contains the Hudi service has been installed in a directory, for example, /opt/client.
- Flink 1.12.2 or later and Hudi 0.9.0 or later are required.
- You have created a user assigned with the FlinkServer Admin Privilege (for example, flink_admin) for accessing the Flink web UI by referring to Creating a FlinkServer Role.
Flink Support for Read and Write Operations on Hudi Tables
Table 1 lists the read and write operations supported by Flink on Hudi COW and MOR tables.
Procedure
- Log in to Manager as user flink_admin and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
- Create a Flink SQL job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.
In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.
- CheckPoint should be enabled on the Flink web UI because data is written to a Hudi table only when a Flink SQL job triggers CheckPoint. Adjust the CheckPoint interval based on service requirements. You are advised to set the interval to a large number.
- If the CheckPoint interval is too short, job exceptions may occur due to untimely data updates. It is recommended that the CheckPoint interval be configured at the minute level.
- Asynchronous compaction is required when a Flink SQL job writes an MOR table. For details about the parameter for controlling the compaction interval, visit Hudi official website https://hudi.apache.org/docs/configurations.html.
- The following shows a Flink SQL job writing data to an MOR table in stream mode.
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode. Delete the comma (,) in the previous line. 'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode. ); insert into stream_mor select * from kafka;
- The following shows a Flink SQL job writing data to a COW table in stream mode:
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode. Delete the comma (,) in the previous line. 'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode. ); insert into stream_write_cow select * from kafka;
- The following shows a Flink SQL job reading an MOR table.
CREATE TABLE hudi_read_spark_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/default/tb_hudimor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts timestamp(6)INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka',--This parameter is not required for clusters in normal mode. Delete the comma (,) in the previous line. 'properties.security.protocol' = 'SASL_PLAINTEXT',--This parameter is not required for clusters in normal mode. 'properties.kerberos.domain.name' = 'hadoop.System domain name'--This parameter is not required for clusters in normal mode. ); insert into kafka select * from hudi_read_spark_mor;
Kafka port number
- Value of sasl.port when Authentication Mode of the cluster is Security Mode, 21007 by default.
- Value of port when Authentication Mode of the cluster is Normal Mode, 9092 by default. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the page that is displayed, click the Configurations tab then the All Configurations sub-tab. On the displayed page, search for allow.everyone.if.no.acl.found, set it to true, and click Save.
- After data is written to the Hudi table by a Flink SQL job and is read by Spark and Hive, use run_hive_sync_tool.sh to synchronize the data in the Hudi table to Hive. For details about the synchronization method, see Synchronizing Hudi Table Data to Hive.
Ensure that no partitions are added before the synchronization. After the synchronization, new partitions cannot be read.
Synchronizing Metadata from Flink On Hudi to Hive
- Synchronizing metadata to Hive in JDBC mode
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = 'Name of the table to be synchronized to Hive', 'hive_sync.db' = 'Name of the database to be synchronized to Hive', 'hive_sync.metastore.uris' = 'Value of hive.metastore.uris in the hive-site.xml file on the Hive client', 'hive_sync.jdbc_url' = 'Value of CLIENT_HIVE_URI in the component_env file on the Hive client' );
- hive_sync.jdbc_url: If the value of CLIENT_HIVE_URI contains \, delete \.
- To use the Hive style partitioning, add the following parameters:
- 'hoodie.datasource.write.hive_style_partitioning' = 'true'
- 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
- Flink on Hudi synchronizes data to Hive. Hudi is case sensitive, while Hive is case insensitive. You are not advised to use uppercase letters in fields of Hudi tables. Otherwise, data may fail to be read or written.
- Synchronizing metadata to Hive in HMS mode
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = 'Name of the table to be synchronized to Hive', 'hive_sync.db' = 'Name of the database to be synchronized to Hive', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'Value of hive.metastore.uris in the hive-site.xml file on the Hive client', 'properties.hive.metastore.kerberos.principal' = 'Value of hive.metastore.kerberos.principal in the hive-site.xml file on the Hive client' );
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot