使用Python第三方库psycopg2连接DWS集群
用户在创建好数据仓库集群后使用psycopg2第三方库连接到集群,则可以使用Python访问DWS ,并进行数据表的各类操作。
连接集群前的准备
- DWS集群已绑定弹性IP。
- 已获取DWS集群的数据库管理员用户名和密码。 请注意,由于MD5算法已经被证实存在碰撞可能,已严禁将之用于密码校验算法。当前DWS采用默认安全设计,默认禁止MD5算法的密码校验,可能导致开源客户端无法正常连接的问题。建议先联系技术支持人员检查数据库参数password_encryption_type参数是否为1,如果取值不为1,需要修改;然后修改一次准备使用的数据库用户的密码。
- DWS出于安全考虑,已经默认不再使用MD5存储密码摘要了,这将导致使用开源驱动或者客户端无法正常连接数据库。需要您调整密码策略后再创建一个新用户或者对老用户做一次密码修改,方可使用开源协议中的MD5认证算法。
- 数据库中是不会存储用户的密码原文,而是存储密码的HASH摘要,在密码校验时与客户端发来的密码摘要进行比对(中间会有加盐操作)。故当您改变了密码算法策略时,数据库也是无法还原您的密码,再生成新的HASH算法的摘要值的。必须您手动修改一次密码或者创建一个新用户,这时新的密码将会采用您设置的HASH算法进行摘要存储,用于下次连接认证。
- 已获取DWS集群的公网访问地址,含IP地址和端口。具体请参见获取DWS集群连接地址。
- 已安装psycopg2第三方库。下载地址:https://pypi.org/project/psycopg2/,安装部署操作请参见:https://www.psycopg.org/install/。
- CentOS、Redhat等操作系统中使用yum命令安装,命令为:
1yum install python-psycopg2
- psycopg2的使用依赖于PostgreSQL的libpq动态库(32位的psycopg2需要对应32位的libpq;64位的psycopg2对应64位的libpq),Linux中可以依赖yum命令解决。在Windows系统使用psycopg2需要先安装libpq,主要方式有两种:
- 安装PostgreSQL,并配置libpq、ssl、crypto动态库位置到环境变量PATH中。
- 安装psqlodbc,使用PostgreSQL ODBC驱动携带的libpq、ssl、crypto动态库。
- CentOS、Redhat等操作系统中使用yum命令安装,命令为:
版本说明
由于DWS集群、Python、psycopg2的版本较多,下方表格仅列举出当前主流版本的支持情况。
psycopg2版本 | Python版本 | DWS集群版本 |
|---|---|---|
2.7.x | 3.8.x | 8.1.3及以上 |
3.9.x | 8.1.3及以上 | |
2.8.x | 3.8.x | 8.1.3及以上 |
3.9.x | 8.1.3及以上 | |
2.9.x | 3.8.x | 8.1.3及以上 |
3.9.x | 8.1.3及以上 |
使用约束
由于psycopg2是基于PostgreSQL的客户端接口,它的功能DWS并不能完全支持。具体支持情况请见下表2。以下接口支持情况是基于Python 3.8.5及psycopg 2.9.1版本。
类名 | 功能描述 | 函数/成员变量 | 支持 | 备注 |
|---|---|---|---|---|
connections | basic | cursor(name=None, cursor_factory=None, scrollable=None, withhold=False) | Y | - |
commit() | Y | - | ||
rollback() | Y | - | ||
close() | Y | - | ||
Two-phase commit support methods | xid(format_id, gtrid, bqual) | Y | - | |
tpc_begin(xid) | Y | - | ||
tpc_prepare() | N | 内核不支持显式prepare transaction。 | ||
tpc_commit([xid]) | Y | - | ||
tpc_rollback([xid]) | Y | - | ||
tpc_recover() | Y | - | ||
closed | Y | - | ||
cancel() | Y | - | ||
reset() | N | 不支持DISCARD ALL。 | ||
dsn | Y | - | ||
Transaction control methods and attributes. | set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=None) | Y | 数据库不支持session中设置default_transaction_read_only。 | |
autocommit | Y | - | ||
isolation_level | Y | - | ||
readonly | N | 数据库不支持session中设置default_transaction_read_only。 | ||
deferrable | Y | - | ||
set_isolation_level(level) | Y | - | ||
encoding | Y | - | ||
set_client_encoding(enc) | Y | - | ||
notices | N | 数据库不支持listen/notify。 | ||
notifies | Y | - | ||
cursor_factory | Y | - | ||
info | Y | - | ||
status | Y | - | ||
lobject | N | 数据库不支持大对象相关操作。 | ||
Methods related to asynchronous support | poll() | Y | - | |
fileno() | Y | - | ||
isexecuting() | Y | - | ||
Interoperation with other C API modules | pgconn_ptr | Y | - | |
get_native_connection() | Y | - | ||
informative methods of the native connection | get_transaction_status() | Y | - | |
protocol_version | Y | - | ||
server_version | Y | - | ||
get_backend_pid() | Y | 获取到的不是后台的pid,是逻辑连接的id号。 | ||
get_parameter_status(parameter) | Y | - | ||
get_dsn_parameters() | Y | - | ||
cursor | basic | description | Y | - |
close() | Y | - | ||
closed | Y | - | ||
connection | Y | - | ||
name | Y | - | ||
scrollable | N | 数据库不支持SCROLL CURSOR。 | ||
withhold | N | withhold cursor在commit前需要关闭。 | ||
Commands execution methods | execute(query, vars=None) | Y | - | |
executemany(query, vars_list) | Y | - | ||
callproc(procname[, parameters]) | Y | - | ||
mogrify(operation[, parameters]) | Y | - | ||
setinputsizes(sizes) | Y | - | ||
fetchone() | Y | - | ||
fetchmany([size=cursor.arraysize]) | Y | - | ||
fetchall() | Y | - | ||
scroll(value[, mode='relative']) | N | 数据库不支持SCROLL CURSOR。 | ||
arraysize | Y | - | ||
itersize | Y | - | ||
rowcount | Y | - | ||
rownumber | Y | - | ||
lastrowid | Y | - | ||
query | Y | - | ||
statusmessage | Y | - | ||
cast(oid, s) | Y | - | ||
tzinfo_factory | Y | - | ||
nextset() | Y | - | ||
setoutputsize(size[, column]) | Y | - | ||
COPY-related methods | copy_from(file, table, sep='\\t', null='\\\\N', size=8192, columns=None) | Y | - | |
copy_to(file, table, sep='\\t', null='\\\\N', columns=None) | Y | - | ||
copy_expert(sql, file, size=8192) | Y | - | ||
Interoperation with other C API modules | pgresult_ptr | Y | - |
在Linux环境使用psycopg2第三方库连接集群
- 以root用户登录Linux环境。
- 执行以下命令创建python_dws.py文件。
vi python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
#!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import psycopg2 def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 except psycopg2.DatabaseError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close()
- 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
psycopg2接口不提供重试连接的能力,您需要在业务代码中实现重试处理。
1 2 3 4 5
conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码
- 执行以下命令,使用psycopg第三方库连接集群。
python python_dws.py
在Windows环境使用psycopg2第三方库连接集群
- 在Windows系统中,单击“开始”按钮 ,在搜索框中,键入cmd,然后在结果列表中单击“cmd.exe”打开命令提示符窗口。
- 在命令提示符窗口中,执行以下命令创建python_dws.py文件。
type nul> python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
#!/usr/bin/python # -*- coding:UTF-8 -*- from __future__ import print_function import psycopg2 def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = psycopg2.connect(host='10.154.70.231', port='8000', database='postgresgaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 except psycopg2.DatabaseError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close()
- 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
1 2 3 4 5
conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码
- 在命令提示符窗口中,执行以下命令,使用psycopg第三方库连接集群。
python python_dws.py
psycopg2连接集群不支持CN Retry特性的问题说明
DWS支持在SQL语句执行出错时的自动重试功能(简称CN Retry)。CN Retry对于客户端和驱动发送的SQL语句在执行失败时可以自动识别错误类型,并进行重试,详情请参见SQL语句出错自动重试。但使用psycopg2默认连接方式创建的连接在语句执行失败时没有自动重试,会直接报错退出。如常见的主备切换场景下,未自动重试会报如下错误,但在自动重试期间完成主备切换,则会返回正确结果。
1 | psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress |
报错原因:
- psycopg2在发送SQL语句前先发送了BEGIN语句开启事务。
- CN Retry不支持事务块中的语句是特性约束。
解决方案:
- 在同步方式连接时,可以通过主动结束驱动开启的事务。
1 2 3 4
cursor = conn.cursor() # 增加end语句主动结束驱动开启的事务 cursor.execute("end; select * from test order by 1;") rows = cursor.fetchall()
- 使用异步连接方式主动开启事务,异步连接介绍具体请参见pyscopg官网:https://www.psycopg.org/docs/advanced.html?highlight=async。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
#!/usr/bin/env python3 # _*_ encoding=utf-8 _*_ import psycopg2 import select # psycopg2官方提供的异步连接方式时的wait函数 # 详见https://www.psycopg.org/docs/advanced.html?highlight=async def wait(conn): while True: state = conn.poll() if state == psycopg2.extensions.POLL_OK: break elif state == psycopg2.extensions.POLL_WRITE: select.select([], [conn.fileno()], []) elif state == psycopg2.extensions.POLL_READ: select.select([conn.fileno()], [], []) else: raise psycopg2.OperationalError("poll() returned %s" % state) def psycopg2_cnretry_sync(): # 创建连接 conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password', # 数据库用户密码 async=1) # 使用异步方式连接 wait(conn) # 执行查询 cursor = conn.cursor() cursor.execute("select * from test order by 1;") wait(conn) rows = cursor.fetchall() for row in rows: print(row[0], row[1]) # 关闭连接 conn.close() if __name__ == '__main__': psycopg2_cnretry_async()

