更新时间:2024-07-10 GMT+08:00
分享

INSERT INTO

本节操作介绍使用INSERT INTO 语句将作业结果写入Sink表中。

写数据至一个Sink表

  • 语法格式
    1
    2
      INSERT INTO your_sink
      SELECT ... FROM your_source WHERE ...
    
  • 示例
    本例定义了两个表my_source 和my_sink,并使用INSERT INTO语句source表选择数据并插入到sink表。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    --使用datagen connector创建源表my_source
    CREATE TABLE my_source (
      name VARCHAR,
      age BIGINT
    ) WITH (
      'connector' = 'datagen');
    
    --使用jdbc connector创建目标表my_sink
    CREATE TABLE my_sink (
      name VARCHAR,
      age BIGINT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://xxx/your-database',
      'table-name' = 'your-table',
      'username' = 'your-username',
      'password' = 'your-password'
    );
    
    --使用INSERT INTO语句从my_source表选择数据,并插入到my_sink表
    INSERT INTO my_sink
    SELECT name, age
    FROM my_source;
    

写数据至多个Sink表

EXECUTE STATEMENT SET BEGIN ... END; 是写数据至多个Sink表的必填语句,用于定义在同一个作业中执行多个插入数据的操作。

写数据至多个Sink表时,EXECUTE STATEMENT SET BEGIN ... END;是必填项。

  • 语法格式
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    EXECUTE STATEMENT SET BEGIN
      -- 第一个DML语句
      INSERT INTO your_sink1
      SELECT ... FROM your_source WHERE ...;
    
      -- 第二个DML语句
      INSERT INTO your_sink2
      SELECT ... FROM your_source WHERE ...
    
    ...
    END;
    
  • 示例
    本例定义了源表datagen_source、Sink表print_sinkA和print_sinkB。然后使用EXECUTE STATEMENT执行两个INSERT INTO语句,分将转换后的数据写入两个不同的sink。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    --使用 datagen connector创建源表 datagen_source
    CREATE TABLE datagen_source (
      name VARCHAR,
      age BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    --使用print connector创建结果表 print_sinkA 和 print_sinkB 
    
    CREATE TABLE print_sinkA(
      name VARCHAR,
      age BIGINT
    ) WITH (
      'connector' = 'print' 
    );
    
    CREATE TABLE print_sinkB(
      name VARCHAR,
      age BIGINT
    ) WITH (
      'connector' = 'print' 
    );
    
    --使用 EXECUTE STATEMENT SET BEGIN来执行两个 INSERT INTO 语句。
    --第一个INSERT INTO语句将datagen_source表中的数据按需转换后写入 print_sinkA。
    --第二个 INSERT INTO 语句将数据按需转换后写入 print_sinkB。。
    EXECUTE  STATEMENT SET BEGIN
    INSERT INTO print_sinkA 
      SELECT UPPER(name), min(age) 
      FROM datagen_source 
      GROUP BY UPPER(name);
    INSERT INTO print_sinkB 
      SELECT LOWER(name), max(age) 
      FROM datagen_source 
      GROUP BY LOWER(name);
    END;
    

相关文档