更新时间:2024-09-19 GMT+08:00
分享

GDS导入示例

多数据服务器并行导入

规划数据服务器与集群处于同一内网,数据服务器IP为192.168.0.90和192.168.0.91。数据源文件格式为CSV。

  1. 以root用户登录每台GDS数据服务器,在两台数据服务器上,分别创建数据文件存放目录“/input_data”。以下以IP为192.168.0.90的数据服务器为例进行操作,剩余服务器上的操作与它一致。
    mkdir -p /input_data
  2. (可选)创建用户及其所属的用户组。此用户用于启动GDS。若该类用户及所属用户组已存在,可跳过此步骤。
    groupadd gdsgrp
    useradd -g gdsgrp gds_user
  3. 将准备好的CSV格式数据源文件均匀分发至相应数据服务器的“/input_data”目录中。
  4. 修改每台数据服务器上数据文件及数据文件目录“/input_data”的属主为gds_user。以下以IP为192.168.0.90的数据服务器为例,进行操作。
    chown -R gds_user:gdsgrp /input_data 
  5. 以gds_user用户登录每台数据服务器上分别启动GDS。

    其中GDS安装路径为“/opt/bin/dws/gds”,数据文件存放在“/input_data/”目录下,数据服务器所在IP为192.168.0.90和192.168.0.91,GDS监听端口为5000,以后台方式运行。

    在IP为192.168.0.90的数据服务器上启动GDS。
    /opt/bin/dws/gds/bin/gds -d /input_data -p 192.168.0.90:5000 -H 10.10.0.1/24 -D

    在IP为192.168.0.91的数据服务器上启动GDS。

    /opt/bin/dws/gds/bin/gds -d /input_data -p 192.168.0.91:5000 -H 10.10.0.1/24  -D
  6. 使用工具连接数据库。详见连接数据库
  7. 创建导入的目标表tpcds.reasons。
    1
    2
    3
    4
    5
    6
    CREATE TABLE tpcds.reasons
    (
      r_reason_sk integer not null,
      r_reason_id char(16) not null,
      r_reason_desc char(100)
    );
    
  8. 创建外表tpcds.foreign_tpcds_reasons用于接收数据服务器上的数据。

    其中设置导入模式信息如下所示:

    • 导入模式为Normal模式。
    • 由于启动GDS时,设置的数据源文件存放目录为“/input_data”,GDS监听端口为5000,所以设置参数“location”为“gsfs://192.168.0.90:5000/* | gsfs://192.168.0.91:5000/*”。

    设置数据格式信息是根据导出时设置的详细数据格式参数信息指定的,参数设置如下所示:

    • 数据源文件格式(format)为CSV。
    • 编码格式(encoding)为UTF-8。
    • 字段分隔符(delimiter)为E'\x08'。
    • 引号字符(quote)为E'\x1b'。
    • 数据文件中空值(null)为没有引号的空字符串。
    • 逃逸字符(escape)默认和quote相同。
    • 数据文件是否包含标题行(header)为默认值false,即导入时数据文件第一行被识别为数据。

    设置导入容错性如下所示:

    • 允许出现的数据格式错误个数(PER NODE REJECT LIMIT 'value')为unlimited,即接受导入过程中所有数据格式错误。
    • 将数据导入过程中出现的数据格式错误信息(LOG INTO error_table_name)写入表err_tpcds_reasons。

    根据以上信息,创建的外表如下所示:

    1
    2
    3
    4
    5
    6
    7
    CREATE FOREIGN TABLE tpcds.foreign_tpcds_reasons
    (
      r_reason_sk integer not null,
      r_reason_id char(16) not null,
      r_reason_desc char(100)
    )
    SERVER gsmpp_server OPTIONS (location 'gsfs://192.168.0.90:5000/* | gsfs://192.168.0.91:5000/*', format 'CSV',mode 'Normal', encoding 'utf8', delimiter E'\x08', quote E'\x1b', null '', fill_missing_fields 'false') LOG INTO err_tpcds_reasons PER NODE REJECT LIMIT 'unlimited';
    
  9. 通过外表tpcds.foreign_tpcds_reasons,将数据导入目标表tpcds.reasons。
    1
    INSERT INTO tpcds.reasons SELECT * FROM tpcds.foreign_tpcds_reasons;
    
  10. 查询错误信息表err_tpcds_reasons,处理数据导入错误。详细请参见处理导入错误
    1
    SELECT * FROM err_tpcds_reasons;
    
  11. 待数据导入完成后,以gds_user用户登录每台数据服务器,分别停止GDS。
    以下以IP为192.168.0.90的数据服务器为例,停止GDS。其中GDS进程号为128954。
    ps -ef|grep gds
    gds_user 128954      1  0 15:03 ?        00:00:00 gds -d /input_data -p 192.168.0.90:5000 -D
    gds_user 129003 118723  0 15:04 pts/0    00:00:00 grep gds
    kill -9 128954

多线程导入

规划数据服务器与集群处于同一内网,数据服务器IP为192.168.0.90,导入的数据源文件格式为CSV,同时导入2个目标表。

  1. 以root用户登录GDS数据服务器,创建数据文件存放目录“/input_data”,以及子目录“/input_data/import1/”和“/input_data/import2/”。
    mkdir -p /input_data
  2. 将目标表tpcds.reasons1的数据源文件存放在数据服务器“/input_data/import1/”目录下,将目标表tpcds.reasons2的数据源文件存放在目录“/input_data/import2/”下。
  3. (可选)创建用户及其所属的用户组。此用户用于启动GDS。若该用户及所属用户组已存在,可跳过此步骤。
    groupadd gdsgrp
    useradd -g gdsgrp gds_user
  4. 修改数据服务器上数据文件及数据文件目录“/input_data”的属主为gds_user。
    chown -R gds_user:gdsgrp /input_data 
  5. 以gds_user用户登录数据服务器上启动GDS。
    其中GDS安装路径为“/opt/bin/dws/gds”,数据文件存放在“/input_data/”目录下,数据服务器所在IP为192.168.0.90,GDS监听端口为5000,以后台方式运行,设定并发度为2,并设定递归文件目录。
    /opt/bin/dws/gds/bin/gds -d /input_data -p 192.168.0.90:5000 -H 10.10.0.1/24  -D -t 2 -r
  6. 使用工具连接数据库。详见连接数据库
  7. 在数据库中创建导入的目标表tpcds.reasons1和tpcds.reasons2。
    1
    2
    3
    4
    5
    6
    CREATE TABLE tpcds.reasons1
    (
      r_reason_sk integer not null,
      r_reason_id char(16) not null,
      r_reason_desc char(100)
    ) ;
    
    1
    2
    3
    4
    5
    6
    CREATE TABLE tpcds.reasons2
    (
      r_reason_sk integer not null,
      r_reason_id char(16) not null,
      r_reason_desc char(100)
    ) ;
    
  8. 在数据库中创建外表tpcds.foreign_tpcds_reasons1和tpcds.foreign_tpcds_reasons2用于接收数据服务器上的数据。

    以下以外表tpcds.foreign_tpcds_reasons1为例,讲解设置的导入外表参数信息。

    其中设置的导入模式信息如下所示:

    • 导入模式为Normal模式。
    • 由于启动GDS时,设置的数据源文件存放目录为“/input_data/”,GDS监听端口为5000,实际存放数据源文件目录为“/input_data/import1/”,所以设置参数“location”为“gsfs://192.168.0.90:5000/import1/*”。

    设置的数据格式信息是根据导出时设置的详细数据格式参数信息指定的,参数设置如下所示:

    • 数据源文件格式(format)为CSV。
    • 编码格式(encoding)为UTF-8。
    • 字段分隔符(delimiter)为E'\x08'。
    • 引号字符(quote)为E'\x1b'。
    • 数据文件中空值(null)为没有引号的空字符串。
    • 逃逸字符(escape)默认和quote相同。
    • 数据文件是否包含标题行(header)为默认值false,即导入时数据文件第一行被识别为数据。

    设置的导入容错性如下所示:

    • 允许出现的数据格式错误个数(PER NODE REJECT LIMIT 'value')为unlimited,即接受导入过程中所有数据格式错误。
    • 将数据导入过程中出现的数据格式错误信息(LOG INTO error_table_name)写入表err_tpcds_reasons1。
    • 当数据源文件中一行的最后一个字段缺失(fill_missing_fields)时,自动设置为NULL。

    根据以上信息,创建的外表tpcds.foreign_tpcds_reasons1如下所示:

    1
    2
    3
    4
    5
    6
    CREATE FOREIGN TABLE tpcds.foreign_tpcds_reasons1
    (
      r_reason_sk integer not null,
      r_reason_id char(16) not null,
      r_reason_desc char(100)
    ) SERVER gsmpp_server OPTIONS (location 'gsfs://192.168.0.90:5000/import1/*', format 'CSV',mode 'Normal', encoding 'utf8', delimiter E'\x08', quote E'\x1b', null '',fill_missing_fields 'on')LOG INTO err_tpcds_reasons1 PER NODE REJECT LIMIT 'unlimited';
    

    参考以上设置,创建的外表tpcds.foreign_tpcds_reasons2如下所示:

    1
    2
    3
    4
    5
    6
    CREATE FOREIGN TABLE tpcds.foreign_tpcds_reasons2
    (
      r_reason_sk integer not null,
      r_reason_id char(16) not null,
      r_reason_desc char(100)
    ) SERVER gsmpp_server OPTIONS (location 'gsfs://192.168.0.90:5000/import2/*', format 'CSV',mode 'Normal', encoding 'utf8', delimiter E'\x08', quote E'\x1b', null '',fill_missing_fields 'on')LOG INTO err_tpcds_reasons2 PER NODE REJECT LIMIT 'unlimited';
    
  9. 通过外表tpcds.foreign_tpcds_reasons1和tpcds.foreign_tpcds_reasons2将数据分别导入tpcds.reasons1和tpcds.reasons2。
    1
    INSERT INTO tpcds.reasons1 SELECT * FROM tpcds.foreign_tpcds_reasons1;
    
    1
    INSERT INTO tpcds.reasons2 SELECT * FROM tpcds.foreign_tpcds_reasons2;
    
  10. 查询错误信息表err_tpcds_reasons1和err_tpcds_reasons2,处理数据导入错误。详细请参见处理导入错误
    1
    2
    SELECT * FROM err_tpcds_reasons1;
    SELECT * FROM err_tpcds_reasons2;
    
  11. 待数据导入完成后,以gds_user用户登录数据服务器,停止GDS。
    其中GDS进程号为128954。
    ps -ef|grep gds
    gds_user 128954      1  0 15:03 ?        00:00:00 gds -d /input_data -p 192.168.0.90:5000 -D -t 2 -r
    gds_user 129003 118723  0 15:04 pts/0    00:00:00 grep gds
    kill -9 128954

单个管道文件导入

  1. 启动GDS:

    /opt/bin/dws/gds/bin/gds -d /***/gds_data/ -D -p 192.168.0.1:7789 -l /***/gds_log/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 执行数据导入。

    1. 登录数据库创建内表:
      1
      CREATE TABLE test_pipe_1( id integer not null, gender text not null, name  text );
      
    2. 创建只读外表:
      1
      CREATE FOREIGN TABLE foreign_test_pipe_tr( like test_pipe ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://192.168.0.1:7789/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe',auto_create_pipe 'false');
      
    3. 执行导入语句,此时语句会阻塞:
      1
      INSERT INTO test_pipe_1 SELECT * FROM foreign_test_pipe_tr;
      

  3. 通过GDS管道文件导入数据。

    1. 登录GDS数据服务器进入GDS数据目录:
      cd /***/gds_data/   
    2. 创建管道文件,如果auto_create_pipe设置为true跳过此步骤:
      mkfifo foreign_test_pipe.pipe;  

      管道文件创建完成后,每执行完一次操作,业务会被自动清理。如果还需要执行其他业务,请参考该步骤重新创建管道文件。

    3. 向管道文件中写入数据:
      cat postgres_public_foreign_test_pipe_tw.txt > foreign_test_pipe.pipe
    4. 若需要读取压缩文件到管道文件,执行:
      gzip -d < out.gz > foreign_test_pipe.pipe
    5. 若需要读取hdfs文件到管道文件,执行:
      hdfs dfs -cat - /user/hive/***/test_pipe.txt > foreign_test_pipe.pipe

  4. 查看导入语句返回的结果:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;
    INSERT 0 4
    SELECT * FROM test_pipe_1;
    id | gender |      name
    ----+-----+----------------
    3 | 2   | 11111111111111
    1 | 2   | 11111111111111
    2 | 2   | 11111111111111
    4 | 2   | 11111111111111
    (4 rows)
    

多进程管道文件导入

GDS支持多进程管道文件导入, 即启动一个外表对应多个GDS。

以本地文件的导入为例:

  1. 启动多个GDS,如果已经启动跳过此步骤:

    /opt/bin/dws/gds/bin/gds -d /***/gds_data/ -D -p 192.168.0.1:7789 -l /***/gds_log/aa.log -H 0/0 -t 10 -D
    /opt/bin/dws/gds/bin/gds -d /***/gds_data_1/ -D -p 192.168.0.1:7790 -l /***/gds_log_1/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 执行数据导入。

    1. 登录数据库创建内表:
      1
      CREATE TABLE test_pipe( id integer not null, gender text not null, name  text );
      
    2. 创建只读外表:
      1
      CREATE FOREIGN TABLE foreign_test_pipe_tr( like test_pipe ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://192.168.0.1:7789/foreign_test_pipe.pipe|gsfs://192.168.0.1:7790/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',', NULL '', EOL '0x0a' , file_type 'pipe', auto_create_pipe 'false');
      
    3. 导入语句,此时语句会阻塞:
      1
      INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;
      

  3. 通过GDS管道文件导入数据。

    1. 登录GDS数据服务器,分别进入GDS数据目录:
      cd /***/gds_data/ 
      cd /***/gds_data_1/
    2. 创建管道文件,如果auto_create_pipe设置为true跳过此步骤:
      mkfifo foreign_test_pipe.pipe;  
    3. 分别读取管道文件并写入新文件:
      cat postgres_public_foreign_test_pipe_tw.txt > foreign_test_pipe.pipe

  4. 查看导入语句返回的结果:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;
    INSERT 0 4
    SELECT * FROM test_pipe_1;
    id | gender |      name
    ----+-----+----------------
    3 | 2   | 11111111111111
    1 | 2   | 11111111111111
    2 | 2   | 11111111111111
    4 | 2   | 11111111111111
    (4 rows)
    

集群间不落地数据导入

  1. 启动GDS。(如果已经启动跳过此步骤)

    gds -d /***/gds_data/ -D -p GDS_IP:GDS_PORT -l /***/gds_log/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 源数据库数据导出。

    1. 登录目标数据库创建内表,并写入数据。
      CREATE TABLE test_pipe( id integer not null, gender text not null, name  text );
      INSERT INTO test_pipe values(1,2,'11111111111111');
      INSERT INTO test_pipe values(2,2,'11111111111111');
      INSERT INTO test_pipe values(3,2,'11111111111111');
      INSERT INTO test_pipe values(4,2,'11111111111111');
      INSERT INTO test_pipe values(5,2,'11111111111111');
    2. 创建只写外表。
      CREATE FOREIGN TABLE foreign_test_pipe( id integer not null, age text not null, name  text ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/', FORMAT 'text', DELIMITER ',', NULL '', EOL '0x0a' ,file_type 'pipe') WRITE ONLY;
    3. 导入语句,此时语句会阻塞。
      INSERT INTO foreign_test_pipe SELECT * FROM test_pipe;

  3. 目标集群导入数据。

    1. 创建内表。
      CREATE TABLE test_pipe (id integer not null, gender text not null, name text);
    2. 创建只读外表。
      CREATE FOREIGN TABLE foreign_test_pipe(like test_pipe) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/', FORMAT 'text', DELIMITER ',', NULL '', EOL '0x0a' , file_type 'pipe', auto_create_pipe 'false');
    3. 执行导入语句:
      INSERT INTO test_pipe SELECT * FROM foreign_test_pipe;

  4. 查看目标集群导入语句返回的结果:

    SELECT * FROM test_pipe;
     id | gender |      name
    ----+-----+----------------
      3 | 2   | 11111111111111
      6 | 2   | 11111111111111
      7 | 2   | 11111111111111
      1 | 2   | 11111111111111
      2 | 2   | 11111111111111
      4 | 2   | 11111111111111
      5 | 2   | 11111111111111
      8 | 2   | 11111111111111
      9 | 2   | 11111111111111
    (9 rows)

GDS默认导出或者导入的管道文件命名规则为:“数据库名_模式名_外表名.pipe”,因此默认需要目标集群与源集群的数据库名及模式名保持一致。如果数据库或模式不一致,则可以在location的url中指定相同的管道文件。

示例:

  • 只写外表指定管道名。
    CREATE FOREIGN TABLE foreign_test_pipe(id integer not null, age text not null, name  text) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe') WRITE ONLY;
  • 只读外表指定管道名。
    CREATE FOREIGN TABLE foreign_test_pipe(like test_pipe) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe',auto_create_pipe 'false');

相关文档