更新时间:2024-10-08 GMT+08:00

使用Python第三方库psycopg2连接集群

用户在创建好数据仓库集群后使用psycopg2第三方库连接到集群,则可以使用Python访问GaussDB(DWS) ,并进行数据表的各类操作。

连接集群前的准备

  • GaussDB(DWS)集群已绑定弹性IP。
  • 已获取GaussDB(DWS)集群的数据库管理员用户名和密码。
    请注意,由于MD5算法已经被证实存在碰撞可能,已严禁将之用于密码校验算法。当前GaussDB(DWS)采用默认安全设计,默认禁止MD5算法的密码校验,可能导致开源客户端无法正常连接的问题。建议先检查数据库参数password_encryption_type参数是否为1,如果取值不为1,需要修改,修改方法参见修改GaussDB(DWS)集群GUC参数;然后修改一次准备使用的数据库用户的密码。
    • 当前GaussDB(DWS)出于安全考虑,已经默认不再使用MD5存储密码摘要了,这将导致使用开源驱动或者客户端无法正常连接数据库。需要您调整密码策略后再创建一个新用户或者对老用户做一次密码修改,方可使用开源协议中的MD5认证算法。
    • 数据库中是不会存储用户的密码原文,而是存储密码的HASH摘要,在密码校验时与客户端发来的密码摘要进行比对(中间会有加盐操作)。故当您改变了密码算法策略时,数据库也是无法还原您的密码,再生成新的HASH算法的摘要值的。必须您手动修改一次密码或者创建一个新用户,这时新的密码将会采用您设置的HASH算法进行摘要存储,用于下次连接认证。
  • 已获取GaussDB(DWS)集群的公网访问地址,含IP地址和端口。具体请参见获取GaussDB(DWS)集群连接地址
  • 已安装psycopg2第三方库。下载地址:https://pypi.org/project/psycopg2/,安装部署操作请参见:https://www.psycopg.org/install/
    • CentOS、Redhat等操作系统中使用yum命令安装,命令为:
      1
      yum install python-psycopg2
      
    • psycopg2的使用依赖于PostgreSQL的libpq动态库(32位的psycopg2需要对应32位的libpq;64位的psycopg2对应64位的libpq),Linux中可以依赖yum命令解决。在Windows系统使用psycopg2需要先安装libpq,主要方式有两种:
      • 安装PostgreSQL,并配置libpq、ssl、crypto动态库位置到环境变量PATH中。
      • 安装psqlodbc,使用PostgreSQL ODBC驱动携带的libpq、ssl、crypto动态库。

版本说明

由于GaussDB(DWS)集群、Python、psycopg2的版本较多,下方表格仅列举出当前主流版本的支持情况。

表1

psycopg2版本

Python版本

GaussDB(DWS)集群版本

2.7.x

3.8.x

8.1.3及以上

3.9.x

8.1.3及以上

2.8.x

3.8.x

8.1.3及以上

3.9.x

8.1.3及以上

2.9.x

3.8.x

8.1.3及以上

3.9.x

8.1.3及以上

使用约束

由于psycopg2是基于PostgreSQL的客户端接口,它的功能GaussDB(DWS)并不能完全支持。具体支持情况请见下表2

以下接口支持情况是基于Python 3.8.5及psycopg 2.9.1版本。

表2 DWS对psycopg2主要接口支持情况

类名

功能描述

函数/成员变量

支持

备注

connections

basic

cursor(name=None, cursor_factory=None, scrollable=None, withhold=False)

Y

-

commit()

Y

-

rollback()

Y

-

close()

Y

-

Two-phase commit support methods

xid(format_id, gtrid, bqual)

Y

-

tpc_begin(xid)

Y

-

tpc_prepare()

N

内核不支持显式prepare transaction。

tpc_commit([xid])

Y

-

tpc_rollback([xid])

Y

-

tpc_recover()

Y

-

closed

Y

-

cancel()

Y

-

reset()

N

不支持DISCARD ALL。

dsn

Y

-

Transaction control methods and attributes.

set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=None)

Y

数据库不支持session中设置default_transaction_read_only。

autocommit

Y

-

isolation_level

Y

-

readonly

N

数据库不支持session中设置default_transaction_read_only。

deferrable

Y

-

set_isolation_level(level)

Y

-

encoding

Y

-

set_client_encoding(enc)

Y

-

notices

N

数据库不支持listen/notify。

notifies

Y

-

cursor_factory

Y

-

info

Y

-

status

Y

-

lobject

N

数据库不支持大对象相关操作。

Methods related to asynchronous support

poll()

Y

-

fileno()

Y

-

isexecuting()

Y

-

Interoperation with other C API modules

pgconn_ptr

Y

-

get_native_connection()

Y

-

informative methods of the native connection

get_transaction_status()

Y

-

protocol_version

Y

-

server_version

Y

-

get_backend_pid()

Y

获取到的不是后台的pid,是逻辑连接的id号。

get_parameter_status(parameter)

Y

-

get_dsn_parameters()

Y

-

cursor

basic

description

Y

-

close()

Y

-

closed

Y

-

connection

Y

-

name

Y

-

scrollable

N

数据库不支持SCROLL CURSOR。

withhold

N

withhold cursor在commit前需要关闭。

Commands execution methods

execute(query, vars=None)

Y

-

executemany(query, vars_list)

Y

-

callproc(procname[, parameters])

Y

-

mogrify(operation[, parameters])

Y

-

setinputsizes(sizes)

Y

-

fetchone()

Y

-

fetchmany([size=cursor.arraysize])

Y

-

fetchall()

Y

-

scroll(value[, mode='relative'])

N

数据库不支持SCROLL CURSOR。

arraysize

Y

-

itersize

Y

-

rowcount

Y

-

rownumber

Y

-

lastrowid

Y

-

query

Y

-

statusmessage

Y

-

cast(oid, s)

Y

-

tzinfo_factory

Y

-

nextset()

Y

-

setoutputsize(size[, column])

Y

-

COPY-related methods

copy_from(file, table, sep='\\t', null='\\\\N', size=8192, columns=None)

Y

-

copy_to(file, table, sep='\\t', null='\\\\N', columns=None)

Y

-

copy_expert(sql, file, size=8192)

Y

-

Interoperation with other C API modules

pgresult_ptr

Y

-

在Linux环境使用psycopg2第三方库连接集群

  1. root用户登录Linux环境。
  2. 执行以下命令创建python_dws.py文件。

    vi python_dws.py

    请复制粘贴以下内容放入python_dws.py文件中:

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
     
    from __future__ import print_function
     
    import psycopg2
     
     
    def create_table(connection):
        print("Begin to create table")
        try:
            cursor = connection.cursor()
            cursor.execute("drop table if exists test;"
                           "create table test(id int, name text);")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Table created successfully")
            cursor.close()
     
     
    def insert_data(connection):
        print("Begin to insert data")
        try:
            cursor = connection.cursor()
            cursor.execute("insert into test values(1,'number1');")
            cursor.execute("insert into test values(2,'number2');")
            cursor.execute("insert into test values(3,'number3');")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Insert data successfully")
            cursor.close()
     
     
    def update_data(connection):
        print("Begin to update data")
        try:
            cursor = connection.cursor()
            cursor.execute("update test set name = 'numberupdated' where id=1;")
            connection.commit()
            print("Total number of rows updated :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Update, Operation done successfully")
     
     
    def delete_data(connection):
        print("Begin to delete data")
        try:
            cursor = connection.cursor()
            cursor.execute("delete from test where id=3;")
            connection.commit()
            print("Total number of rows deleted :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Delete,Operation done successfully")
     
     
    def select_data(connection):
        print("Begin to select data")
        try:
            cursor = connection.cursor()
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
            print("select failed")
        else:
            print("Operation done successfully")
            cursor.close()
     
     
    if __name__ == '__main__':
        try:
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # 需要连接的database
                                    user='dbadmin',
                                    password='password')  # 数据库用户密码
        except psycopg2.DatabaseError as ex:
            print(ex)
            print("Connect database failed")
        else:
            print("Opened database successfully")
            create_table(conn)
            insert_data(conn)
            select_data(conn)
            update_data(conn)
            delete_data(conn)
            conn.close()
    

  3. 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。

    psycopg2接口不提供重试连接的能力,您需要在业务代码中实现重试处理。

    1
    2
    3
    4
    5
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # 需要连接的database
                                    user='dbadmin',
                                    password='password')  # 数据库用户密码
    

  4. 执行以下命令,使用psycopg第三方库连接集群。

    python python_dws.py

在Windows环境使用psycopg2第三方库连接集群

  1. 在Windows系统中,单击“开始”按钮 ,在搜索框中,键入cmd,然后在结果列表中单击“cmd.exe”打开命令提示符窗口。
  2. 在命令提示符窗口中,执行以下命令创建python_dws.py文件。

    type nul> python_dws.py

    请复制粘贴以下内容放入python_dws.py文件中:

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    #!/usr/bin/python
    # -*- coding:UTF-8 -*-
    
    from __future__ import print_function
    
    import psycopg2
    
    
    def create_table(connection):
        print("Begin to create table")
        try:
            cursor = connection.cursor()
            cursor.execute("drop table if exists test;"
                           "create table test(id int, name text);")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Table created successfully")
            cursor.close()
    
    
    def insert_data(connection):
        print("Begin to insert data")
        try:
            cursor = connection.cursor()
            cursor.execute("insert into test values(1,'number1');")
            cursor.execute("insert into test values(2,'number2');")
            cursor.execute("insert into test values(3,'number3');")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Insert data successfully")
            cursor.close()
    
    
    def update_data(connection):
        print("Begin to update data")
        try:
            cursor = connection.cursor()
            cursor.execute("update test set name = 'numberupdated' where id=1;")
            connection.commit()
            print("Total number of rows updated :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Update, Operation done successfully")
    
    
    def delete_data(connection):
        print("Begin to delete data")
        try:
            cursor = connection.cursor()
            cursor.execute("delete from test where id=3;")
            connection.commit()
            print("Total number of rows deleted :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Delete,Operation done successfully")
    
    
    def select_data(connection):
        print("Begin to select data")
        try:
            cursor = connection.cursor()
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
            print("select failed")
        else:
            print("Operation done successfully")
            cursor.close()
    
    
    if __name__ == '__main__':
        try:
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='postgresgaussdb',  # 需要连接的database
                                    user='dbadmin',
                                    password='password')  # 数据库用户密码
        except psycopg2.DatabaseError as ex:
            print(ex)
            print("Connect database failed")
        else:
            print("Opened database successfully")
            create_table(conn)
            insert_data(conn)
            select_data(conn)
            update_data(conn)
            delete_data(conn)
            conn.close()
    

  3. 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。

    1
    2
    3
    4
    5
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # 需要连接的database
                                    user='dbadmin',
                                    password='password')  # 数据库用户密码
    

  4. 在命令提示符窗口中,执行以下命令,使用psycopg第三方库连接集群。

    python python_dws.py

psycopg2连接集群不支持CN Retry特性的问题说明

GaussDB(DWS)支持在SQL语句执行出错时的自动重试功能(简称CN Retry)。CN Retry对于客户端和驱动发送的SQL语句在执行失败时可以自动识别错误类型,并进行重试,详情请参见SQL语句出错自动重试。但使用psycopg2默认连接方式创建的连接在语句执行失败时没有自动重试,会直接报错退出。如常见的主备切换场景下,未自动重试会报如下错误,但在自动重试期间完成主备切换,则会返回正确结果。

1
psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress

报错原因:

  1. psycopg2在发送SQL语句前先发送了BEGIN语句开启事务。
  2. CN Retry不支持事务块中的语句是特性约束。

解决方案:

  • 在同步方式连接时,可以通过主动结束驱动开启的事务。
    1
    2
    3
    4
    cursor = conn.cursor()
    # 增加end语句主动结束驱动开启的事务
    cursor.execute("end; select * from test order by 1;") 
    rows = cursor.fetchall()
    
  • 使用异步连接方式主动开启事务,异步连接介绍具体请参见pyscopg官网:https://www.psycopg.org/docs/advanced.html?highlight=async
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    #!/usr/bin/env python3
    # _*_ encoding=utf-8 _*_
     
    import psycopg2
    import select
     
    # psycopg2官方提供的异步连接方式时的wait函数
    # 详见https://www.psycopg.org/docs/advanced.html?highlight=async
    def wait(conn):
        while True:
            state = conn.poll()
            if state == psycopg2.extensions.POLL_OK:
                break
            elif state == psycopg2.extensions.POLL_WRITE:
                select.select([], [conn.fileno()], [])
            elif state == psycopg2.extensions.POLL_READ:
                select.select([conn.fileno()], [], [])
            else:
                raise psycopg2.OperationalError("poll() returned %s" % state)
     
    def psycopg2_cnretry_sync():
        # 创建连接
        conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # 需要连接的database
                                    user='dbadmin',
                                    password='password',  # 数据库用户密码
                                    async=1) # 使用异步方式连接
        wait(conn)
     
        # 执行查询
        cursor = conn.cursor()
        cursor.execute("select * from test order by 1;")
        wait(conn)
        rows = cursor.fetchall()
        for row in rows:
            print(row[0], row[1])
     
        # 关闭连接
        conn.close()
     
    if __name__ == '__main__':
        psycopg2_cnretry_async()