文档首页 > > 开发指南>

GDS管道文件导入导出

GDS管道文件导入导出

分享
更新时间:2021/04/07 GMT+08:00
  • 当前版本暂不支持SSL模式下GDS导入导出,请勿以SSL方式使用GDS。
  • 本章涉及的所有管道文件都是指linux上的命名管道。

当前版本的GDS已经支持从管道文件导入数据或者从数据库导出到管道文件,该功能使GDS的导入导出更加灵活多变。

  • 当GDS用户的本地磁盘空间不足时:
    • 可直接将hdfs上的数据写入到管道文件而不需要占用额外的磁盘空间。
    • 通过管道文件将从GDS导出的数据进行压缩减少磁盘空间。
    • 通过管道直接将导出来的数据放到hdfs服务器上。
  • 当用户导入导出前需要清洗数据时:
    • 用户可以根据自己的需求编写程序,将需要处理的数据流式实时的写入管道文件或者流式实时的处理从管道中读取的内容,完成导入导出的数据清洗工作。

注意事项

  • GDS支持并发导入导出,gds -t参数用于设置gds的工作线程池大小,控制并发场景下同时工作的工作线程数且不会加速单个sql任务。gds -t缺省值为8,上限值为200。在使用管道功能进行导入导出时,-t参数应不低于业务并发数。如果是双集群互联互通场景,-t参数应不低于业务并发数的两倍。
  • 由于管道“读取即删除”的特点,需确保导入或导出过程中除GDS程序外无其他程序读取管道文件,避免导入过程中数据丢失或者任务报错及导出的文件内容混乱。
  • 不支持对具有相同location的外表并发导入导出,即GDS的多个线程同时读取管道文件或者同时写入管道文件。
  • GDS的单个导入导出任务只识别一个管道文件,因此不要对GDS外表设置带有通配符({}[]?)的location地址。如:
    CREATE FOREIGN TABLE foreign_test_pipe_tr( like test_pipe ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://127.0.0.1:7789/foreign_test_*', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe',auto_create_pipe 'false');
  • GDS启用‐r递归参数时只识别一个管道文件,即GDS只会识别当前数据目录下的一个管道文件而不会递归寻找,因此-r参数在管道文件导入导出场景下不生效。
  • 管道文件的导入导出不支持CN Retry,因为GDS无法控制对端用户和程序操作管道的行为。
  • GDS导入时默认对端程序超过1小时未向管道中写入数据导入任务将会超时报错。
  • GDS导出时默认对端程序超过1小时未从管道中读数据导出任务将会超时报错。
  • 确保GDS版本和内核版本都已经支持管道文件导入导出功能。
  • 当外表参数auto_create_pipe设置为true时,GDS自动创建管道文件可能存在延迟,因此操作管道文件时建议先判断自动创建的管道文件是否存在,且是否为管道文件类型。
  • GDS管道文件的导入导出任务结束后会自动删除管道文件,但是手动终止任务时,管道文件的删除会有延迟,直到到达超时时间后才会被删除。

GDS管道文件导出

示例:

  1. 启动GDS。

    gds -d /***/gds_data/ -D -p 127.0.0.1:7789 -l /***/gds_log/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 执行数据导出。

    1. 登录数据库创建内表,并写入数据。
      CREATE TABLE test_pipe( id integer not null, sex text not null, name text ) ;
      CREATE TABLE
      
      INSERT INTO test_pipe values(1,2,'11111111111111');
      INSERT INTO test_pipe values(2,2,'11111111111111');
      INSERT INTO test_pipe values(3,2,'11111111111111');
      INSERT INTO test_pipe values(4,2,'11111111111111');
      INSERT 0 1
    2. 创建只写外表。
      CREATE FOREIGN TABLE foreign_test_pipe_tw( id integer not null, age text not null, name  text ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://127.0.0.1:7789/', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe', auto_create_pipe 'false') WRITE ONLY;
    3. 导出语句,此时语句会阻塞。
      INSERT INTO foreign_test_pipe_tw select * from test_pipe; 

  3. 通过管道文件将数据从GDS导出。

    1. 登录GDS,进入GDS数据目录。
      cd /***/gds_data/  
    2. 创建管道文件,如果auto_create_pipe设置为true跳过此步骤。
      mkfifo postgres_public_foreign_test_pipe_tw.pipe 
    3. 读取管道文件并写入新文件。
      cat postgres_public_foreign_test_pipe_tw.pipe > postgres_public_foreign_test_pipe_tw.txt
    4. 若需要对导出的文件进行压缩执行:
      gzip -9 -c < postgres_public_foreign_test_pipe_tw.pipe  > out.gz 
    5. 若需要将管道文件的内容导出到hdfs服务器:
      cat postgres_public_foreign_test_pipe_tw.pipe  | hdfs dfs -put -  /user/hive/***/test_pipe.txt

  4. 验证导出的数据。

    1. 查看文件是否导出正确。
      cat postgres_public_foreign_test_pipe_tw.txt
      3,2,11111111111111
      1,2,11111111111111
      2,2,11111111111111
      4,2,11111111111111
    2. 查看压缩后的文件。
      vim out.gz
      3,2,11111111111111
      1,2,11111111111111
      2,2,11111111111111
      4,2,11111111111111
    3. 查看导出到hdfs服务器上的数据。
      hdfs dfs -cat /user/hive/***/test_pipe.txt
      3,2,11111111111111
      1,2,11111111111111
      2,2,11111111111111
      4,2,11111111111111

GDS管道文件导入

示例

  1. 启动GDS。

    gds -d /***/gds_data/ -D -p 127.0.0.1:7789 -l /***/gds_log/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间则 使用--pipe-timeout参数设置。

  2. 执行数据导入。

    1. 登录数据库创建内表。
      CREATE TABLE test_pipe_1( id integer not null, sex text not null, name  text );
    2. 创建只读外表。
      CREATE FOREIGN TABLE foreign_test_pipe_tr( like test_pipe ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://127.0.0.1:7789/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe',auto_create_pipe 'false');
    3. 执行导入语句,此时语句会阻塞。
      INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;

  3. 通过GDS管道文件导入数据。

    1. 登录GDS服务器进入GDS数据目录。
      cd /***/gds_data/   
    2. 创建管道文件,如果auto_create_pipe设置为true跳过此步骤。
      mkfifo foreign_test_pipe.pipe;  
    3. 向管道文件中写入数据。
      cat postgres_public_foreign_test_pipe_tw.txt > foreign_test_pipe_.pipe
    4. 若需要读取压缩文件到管道文件,执行:
      gzip -d < out.gz > foreign_test_pipe.pipe
    5. 若需要读取hdfs文件到管道文件,执行:
      hdfs dfs -cat - /user/hive/***/test_pipe.txt > foreign_test_pipe.pipe

  4. 查看导入语句返回的结果:

    INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;
    INSERT 0 4
    SELECT * FROM test_pipe_1;
    id | sex |      name
    ----+-----+----------------
    3 | 2   | 11111111111111
    1 | 2   | 11111111111111
    2 | 2   | 11111111111111
    4 | 2   | 11111111111111
    (4 rows)
    

多进程管道文件导出

GDS也支持多进程管道文件导入导出, 即启动一个外表对应多个GDS。

以本地文件的导出为例:

  1. 启动多个GDS。

    gds -d /***/gds_data/ -D -p 127.0.0.1:7789 -l /***/gds_log/aa.log -H 0/0 -t 10 -D
    gds -d /***/gds_data_1/ -D -p 127.0.0.1:7790 -l /***/gds_log/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 执行数据导出。

    1. 登录数据库创建内表。
      CREATE TABLE test_pipe (id integer not null, sex text not null, name  text);
    2. 写入数据。
      INSERT INTO test_pipe values(1,2,'11111111111111');
      INSERT INTO test_pipe values(2,2,'11111111111111');
      INSERT INTO test_pipe values(3,2,'11111111111111');
      INSERT INTO test_pipe values(4,2,'11111111111111');
    3. 创建只写外表。
      CREATE FOREIGN TABLE foreign_test_pipe_tw( id integer not null, age text not null, name  text ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://127.0.0.1:7789/|gsfs://127.0.0.1:7790/', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe', auto_create_pipe 'false') WRITE ONLY;
    4. 导出语句,此时语句会阻塞。
      INSERT INTO foreign_test_pipe_tw select * from test_pipe; 

  3. 通过管道文件将数据从GDS导出。

    1. 登录GDS,分别进入GDS数据目录。
      cd /***/gds_data/ 
      cd /***/gds_data_1/ 
    2. 创建管道文件,如果auto_create_pipe设置为true跳过此步骤。
      mkfifo postgres_public_foreign_test_pipe_tw.pipe 
    3. 分别读取管道文件并写入新文件。
      cat postgres_public_foreign_test_pipe_tw.pipe > postgres_public_foreign_test_pipe_tw.txt

  4. 验证导出的数据。

    cat /***/gds_data/postgres_public_foreign_test_pipe_tw.txt
    3,2,11111111111111
    cat /***/gds_data_1/postgres_public_foreign_test_pipe_tw.txt
    1,2,11111111111111
    2,2,11111111111111
    4,2,11111111111111

多进程管道文件导入

GDS支持多进程管道文件导入, 即启动一个外表对应多个GDS。

以本地文件的导入为例:

  1. 启动多个GDS,如果已经启动跳过此步骤。

    gds -d /***/gds_data/ -D -p 127.0.0.1:7789 -l /***/gds_log/aa.log -H 0/0 -t 10 -D
    gds -d /***/gds_data_1/ -D -p 127.0.0.1:7790 -l /***/gds_log_1/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 执行数据导入。

    1. 登录数据库创建内表。
      CREATE TABLE test_pipe( id integer not null, sex text not null, name  text );
    2. 创建只读外表。
      CREATE FOREIGN TABLE foreign_test_pipe_tr( like test_pipe ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://127.0.0.1:7789/foreign_test_pipe.pipe|gsfs://127.0.0.1:7790/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',', NULL '', EOL '0x0a' , file_type 'pipe', auto_create_pipe 'false');
    3. 导入语句,此时语句会阻塞。
      INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;

  3. 通过GDS管道文件导入数据。

    1. 登录GDS,分别进入GDS数据目录。
      cd /***/gds_data/ 
      cd /***/gds_data_1/
    2. 创建管道文件,如果auto_create_pipe设置为true跳过此步骤。
      mkfifo foreign_test_pipe.pipe;  
    3. 分别读取管道文件并写入新文件:
      cat postgres_public_foreign_test_pipe_tw.txt > foreign_test_pipe.pipe

  4. 查看导入语句返回的结果:

    INSERT INTO test_pipe_1 select * from foreign_test_pipe_tr;
    INSERT 0 4
    SELECT * FROM test_pipe_1;
    id | sex |      name
    ----+-----+----------------
    3 | 2   | 11111111111111
    1 | 2   | 11111111111111
    2 | 2   | 11111111111111
    4 | 2   | 11111111111111
    (4 rows)

集群之间不落地导入导出

  1. 启动GDS。(如果已经启动跳过此步骤)

    gds -d /***/gds_data/ -D -p GDS_IP:GDS_PORT -l /***/gds_log/aa.log -H 0/0 -t 10 -D

    如果需要设置管道文件的超时时间,则使用--pipe-timeout参数设置。

  2. 源数据库数据导出。

    1. 登录目标数据库创建内表,并写入数据。
      CREATE TABLE test_pipe( id integer not null, sex text not null, name  text );
      INSERT INTO test_pipe values(1,2,'11111111111111');
      INSERT INTO test_pipe values(2,2,'11111111111111');
      INSERT INTO test_pipe values(3,2,'11111111111111');
      INSERT INTO test_pipe values(4,2,'11111111111111');
      INSERT INTO test_pipe values(5,2,'11111111111111');
    2. 创建只写外表。
      CREATE FOREIGN TABLE foreign_test_pipe( id integer not null, age text not null, name  text ) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/', FORMAT 'text', DELIMITER ',', NULL '', EOL '0x0a' ,file_type 'pipe') WRITE ONLY;
    3. 导入语句,此时语句会阻塞。
      INSERT INTO foreign_test_pipe SELECT * FROM test_pipe;

  3. 目标集群导入数据。

    1. 创建内表。
      CREATE TABLE test_pipe (id integer not null, sex text not null, name text);
    2. 创建只读外表。
      CREATE FOREIGN TABLE foreign_test_pipe(like test_pipe) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/', FORMAT 'text', DELIMITER ',', NULL '', EOL '0x0a' , file_type 'pipe', auto_create_pipe 'false');
    3. 执行导入语句:
      INSERT INTO test_pipe SELECT * FROM foreign_test_pipe;

  4. 查看目标集群导入语句返回的结果:

    SELECT * FROM test_pipe;
     id | sex |      name
    ----+-----+----------------
      3 | 2   | 11111111111111
      6 | 2   | 11111111111111
      7 | 2   | 11111111111111
      1 | 2   | 11111111111111
      2 | 2   | 11111111111111
      4 | 2   | 11111111111111
      5 | 2   | 11111111111111
      8 | 2   | 11111111111111
      9 | 2   | 11111111111111
    (9 rows)

GDS默认导出或者导入的管道文件命名规则为:"数据库名_模式名_外表名.pipe”,因此默认需要目标集群与源集群的数据库名及模式名保持一致。如果数据库或模式不一致,则可以在location的url中指定相同的管道文件。

示例:

  • 只写外表指定管道名。
    CREATE FOREIGN TABLE foreign_test_pipe(id integer not null, age text not null, name  text) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe') WRITE ONLY;
  • 只读外表指定管道名。
    CREATE FOREIGN TABLE foreign_test_pipe(like test_pipe) SERVER gsmpp_server OPTIONS (LOCATION 'gsfs://GDS_IP:GDS_PORT/foreign_test_pipe.pipe', FORMAT 'text', DELIMITER ',',  NULL '', EOL '0x0a' ,file_type 'pipe',auto_create_pipe 'false');

常见问题和定位方法:

  • 问题1:"/***/postgres_public_foreign_test_pipe_tr.pipe" must be named pipe.

    定位方法:GDS的外表file_type类型为pipe但是操作的文件却是一个普通文件类型。应该排查postgres_public_foreign_test_pipe_tr.pipe是否是为管道文件。

  • 问题2:could not open pipe "/***/postgres_public_foreign_test_pipe_tw.pipe" cause by Permission denied.

    定位方法:GDS没有权限打开管道文件。

  • 问题3:could not open source file /*****/postgres_public_foreign_test_pipe_tw.pipe because timeout 300s for WRITING.

    定位方法:GDS导出时打开管道文件超时,一般由于auto_create_pipe为false时候,管道文件在300秒内未被创建,或者创建了但是300秒内没有程序读取该管道文件。

  • 问题4:could not open source file /*****/postgres_public_foreign_test_pipe_tw.pipe because timeout 300s for READING.

    定位方法:GDS导出时打开管道文件超时,一般由于auto_create_pipe为false时候,管道文件在300秒内未被创建或者创建了但是300秒之内没有程序写入该管道文件。

  • 问题5:could not poll writing source pipe file "/****/postgres_public_foreign_test_pipe_tw.pipe" timeout 300s.

    定位方法:GDS导出时超过300秒未等到管道上的写事件,一般由于该管道文件超过300秒没有被读取。

  • 问题6:could not poll reading source pipe file "/****/postgres_public_foreign_test_pipe_tw.pipe" timeout 300s.

    定位方法:GDS导入时超过300秒未等到管道上的读事件,一般由于该管道文件超过300秒没有被写入。

  • 问题7:could not open pipe file "/***/postgres_public_foreign_test_pipe_tw.pipe" for "WRITING" with error No such device or address.

    定位方法:表示当前"/***/postgres_public_foreign_test_pipe_tw.pipe"管道文件没有程序正在读取导致GDS无法以写的方式打开管道文件。

分享:

    相关文档

    相关产品