文档首页/
    
      
      MapReduce服务 MRS/
      
      
        
        
        组件操作指南(LTS版)(巴黎区域)/
        
        
        使用Flink/
        
        
        使用Flink WebUI/
        
        
        FlinkServer对接外部组件/
        
      
      FlinkServer对接Hudi
    
  
  
    
        更新时间:2022-12-14 GMT+08:00
        
          
          
        
      
      
      
      
      
      
      
      
  
      
      
      
        
FlinkServer对接Hudi
操作场景
本指南通过使用FlinkServer写FlinkSQL对接Hudi。
前提条件
- 集群已安装HDFS、Yarn、Flink和Hudi等服务。
 - 包含Hudi服务的客户端已安装,例如安装路径为:/opt/Bigdata/client。
 - Flink要求1.12.2及以上版本,Hudi要求0.9.0及以上版本。
 - 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI,如:flink_admin。
 
Flink对Hudi表的读写支持
Flink对Hudi表的COW表、MOR表类型读写支持详情见表1。
 
   目前FlinkSQL对接Hudi仅支持Snapshot mode和Read Optimized mode两种模式。
操作步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
 - 参考新建作业,新建Flink SQL流作业,在作业开发界面进行如下作业配置。并启动作业。
    
    
需勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
 
     - 由于FlinkSQL作业在触发CheckPoint时才会往Hudi表中写数据,所以需要在Flink WebUI界面中开启CheckPoint。CheckPoint间隔根据业务需要调整,建议间隔调大。
 - 如果CheckPoint间隔太短,数据来不及刷新会导致作业异常;建议CheckPoint间隔为分钟级。
 - FlinkSQL作业写MOR表时需要做异步compaction,控制compaction间隔的参数,见Hudi官网:https://hudi.apache.org/docs/configurations.html
 
- FlinkSQL流式写入MOR表,仅支持kafka json格式。
      
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_mor select * from kafka;
 - FlinkSQL流式写入COW表
      
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_write_cow select * from kafka;
 - FlinkSQL读取MOR表
      
CREATE TABLE hudi_read_spark_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/default/tb_hudimor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts timestamp(6)INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into hudi_read_spark_mor select * from kafka;
 
 - FlinkSQL写入Hudi表数据后,通过Spark、Hive读该数据时,需要使用run_hive_sync_tool.sh将Hudi表数据同步到Hive中。同步方法请参考将Hudi表数据同步到Hive。
    
    
 
     同步前需要保证不再新增分区,同步后新增的分区将不能被读取。
 
   父主题: FlinkServer对接外部组件