使用Python第三方库PyGreSQL连接DWS集群
用户在创建好数据仓库集群后使用PyGreSQL第三方库连接到集群,则可以使用Python访问DWS,并进行数据表的各类操作。
连接集群前的准备
- DWS集群已绑定弹性IP。
- 已获取DWS集群的数据库管理员用户名和密码。 
     请注意,由于MD5算法已经被证实存在碰撞可能,已严禁将之用于密码校验算法。当前DWS采用默认安全设计,默认禁止MD5算法的密码校验,可能导致开源客户端无法正常连接的问题。建议先联系技术支持人员检查数据库参数password_encryption_type参数是否为1,如果取值不为1,需要修改;然后修改一次准备使用的数据库用户的密码。  - DWS出于安全考虑,已经默认不再使用MD5存储密码摘要了,这将导致使用开源驱动或者客户端无法正常连接数据库。需要您调整密码策略后再创建一个新用户或者对老用户做一次密码修改,方可使用开源协议中使用的MD5认证算法。
- 数据库中是不会存储您的密码原文的,而是存储的密码的HASH摘要,在密码校验时与客户端发来的密码摘要进行比对(中间会有加盐操作)。故当您改变了密码算法策略时,数据库也是无法还原您的密码,再生成新的HASH算法的摘要值的。必须您手动修改一次密码或者创建一个新用户,这时新的密码将会采用您设置的HASH算法进行摘要存储,用于下次连接认证。
 
- 已获取DWS集群的公网访问地址,含IP地址和端口。具体请参见获取DWS集群连接地址。
- 已安装PyGreSQL第三方库。
- 安装部署操作请参见:http://www.pygresql.org/contents/install.html。 
       - CentOS、Redhat等操作系统中使用yum命令安装,命令为: 
         1yum install PyGreSQL 
- PyGreSQL的使用依赖于PostgreSQL的libpq动态库(32位的PyGreSQL对应32位的libpq,64位的PyGreSQL对应64位的libpq),Linux中可以依赖yum命令解决。在Windows系统使用PyGreSQL需要先安装libpq,主要方式有两种: 
         - 安装PostgreSQL,并配置libpq、ssl、crypto动态库位置到环境变量PATH中。
- 安装psqlodbc,使用PostgreSQL ODBC驱动携带的libpq、ssl、crypto动态库。
 
 
- CentOS、Redhat等操作系统中使用yum命令安装,命令为: 
         
使用约束
由于PyGreSQL是基于PostgreSQL的客户端接口,它的功能DWS并不能完全支持。具体支持情况请见下表。
 
 
    以下接口支持情况是基于Python 3.8.5及PyGreSQL 5.2.4版本。
| PyGreSQL | 支持 | 备注 | |
|---|---|---|---|
| Module functions and constants | connect – Open a PostgreSQL connection | Y | - | 
| get_pqlib_version – get the version of libpq | Y | - | |
| get/set_defhost – default server host [DV] | Y | - | |
| get/set_defport – default server port [DV] | Y | - | |
| get/set_defopt – default connection options [DV] | Y | - | |
| get/set_defbase – default database name [DV] | Y | - | |
| get/set_defuser – default database user [DV] | Y | - | |
| get/set_defpasswd – default database password [DV] | Y | - | |
| escape_string – escape a string for use within SQL | Y | - | |
| escape_bytea – escape binary data for use within SQL | Y | - | |
| unescape_bytea – unescape data that has been retrieved as text | Y | - | |
| get/set_namedresult – conversion to named tuples | Y | - | |
| get/set_decimal – decimal type to be used for numeric values | Y | - | |
| get/set_decimal_point – decimal mark used for monetary values | Y | - | |
| get/set_bool – whether boolean values are returned as bool objects | Y | - | |
| get/set_array – whether arrays are returned as list objects | Y | - | |
| get/set_bytea_escaped – whether bytea data is returned escaped | Y | - | |
| get/set_jsondecode – decoding JSON format | Y | - | |
| get/set_cast_hook – fallback typecast function | Y | - | |
| get/set_datestyle – assume a fixed date style | Y | - | |
| get/set_typecast – custom typecasting | Y | - | |
| cast_array/record – fast parsers for arrays and records | Y | - | |
| Type helpers | Y | - | |
| Module constants | Y | - | |
| Connection – The connection object | query – execute a SQL command string | Y | - | 
| send_query - executes a SQL command string asynchronously | Y | - | |
| query_prepared – execute a prepared statement | Y | - | |
| prepare – create a prepared statement | Y | - | |
| describe_prepared – describe a prepared statement | Y | - | |
| reset – reset the connection | Y | - | |
| poll - completes an asynchronous connection | Y | - | |
| cancel – abandon processing of current SQL command | Y | - | |
| close – close the database connection | Y | - | |
| transaction – get the current transaction state | Y | - | |
| parameter – get a current server parameter setting | Y | - | |
| date_format – get the currently used date format | Y | - | |
| fileno – get the socket used to connect to the database | Y | - | |
| set_non_blocking - set the non-blocking status of the connection | Y | - | |
| is_non_blocking - report the blocking status of the connection | Y | - | |
| getnotify – get the last notify from the server | N | 数据库不支持listen/notify。 | |
| inserttable – insert a list into a table | Y | copy命令中如果有\n,请使用双引号引用此字段。 | |
| get/set_notice_receiver – custom notice receiver | Y | - | |
| putline – write a line to the server socket [DA] | Y | - | |
| getline – get a line from server socket [DA] | Y | - | |
| endcopy – synchronize client and server [DA] | Y | - | |
| locreate – create a large object in the database [LO] | N | 大对象相关操作。 | |
| getlo – build a large object from given oid [LO] | N | 大对象相关操作。 | |
| loimport – import a file to a large object [LO] | N | 大对象相关操作。 | |
| Object attributes | Y | - | |
| The DB wrapper class | Initialization | Y | - | 
| pkey – return the primary key of a table | Y | - | |
| get_databases – get list of databases in the system | Y | - | |
| get_relations – get list of relations in connected database | Y | - | |
| get_tables – get list of tables in connected database | Y | - | |
| get_attnames – get the attribute names of a table | Y | - | |
| has_table_privilege – check table privilege | Y | - | |
| get/set_parameter – get or set run-time parameters | Y | - | |
| begin/commit/rollback/savepoint/release – transaction handling | Y | - | |
| get – get a row from a database table or view | Y | - | |
| insert – insert a row into a database table | Y | - | |
| update – update a row in a database table | Y | - | |
| upsert – insert a row with conflict resolution | Y | - | |
| query – execute a SQL command string | Y | - | |
| query_formatted – execute a formatted SQL command string | Y | - | |
| query_prepared – execute a prepared statement | Y | - | |
| prepare – create a prepared statement | Y | - | |
| describe_prepared – describe a prepared statement | Y | - | |
| delete_prepared – delete a prepared statement | Y | - | |
| clear – clear row values in memory | Y | - | |
| delete – delete a row from a database table | Y | 元组必须有唯一键或者主键。 | |
| truncate – quickly empty database tables | Y | - | |
| get_as_list/dict – read a table as a list or dictionary | Y | - | |
| escape_literal/identifier/string/bytea – escape for SQL | Y | - | |
| unescape_bytea – unescape data retrieved from the database | Y | - | |
| encode/decode_json – encode and decode JSON data | Y | - | |
| use_regtypes – determine use of regular type names | Y | - | |
| notification_handler – create a notification handler | N | 数据库不支持listen/notify。 | |
| Attributes of the DB wrapper class | Y | - | |
| Query methods | getresult – get query values as list of tuples | Y | - | 
| dictresult/dictiter – get query values as dictionaries | Y | - | |
| namedresult/namediter – get query values as named tuples | Y | - | |
| scalarresult/scalariter – get query values as scalars | Y | - | |
| one/onedict/onenamed/onescalar – get one result of a query | Y | - | |
| single/singledict/singlenamed/singlescalar – get single result of a query | Y | - | |
| listfields – list fields names of previous query result | Y | - | |
| fieldname, fieldnum – field name/number conversion | Y | - | |
| fieldinfo – detailed info about query result fields | Y | - | |
| ntuples – return number of tuples in query object | Y | - | |
| memsize – return number of bytes allocated by query result | Y | - | |
| LargeObject – Large Objects | open – open a large object | N | 大对象相关操作。 | 
| close – close a large object | N | 大对象相关操作。 | |
| read, write, tell, seek, unlink – file-like large object handling | N | 大对象相关操作。 | |
| size – get the large object size | N | 大对象相关操作。 | |
| export – save a large object to a file | N | 大对象相关操作。 | |
| Object attributes | N | 大对象相关操作。 | |
| The Notification Handler | Instantiating the notification handler | N | 数据库不支持listen/notify。 | 
| Invoking the notification handler | N | 数据库不支持listen/notify。 | |
| Sending notifications | N | 数据库不支持listen/notify。 | |
| Auxiliary methods | N | 数据库不支持listen/notify。 | |
| pgdb | |||
| Module functions and constants | connect – Open a PostgreSQL connection | Y | - | 
| get/set/reset_typecast – Control the global typecast functions | Y | - | |
| Module constants | Y | - | |
| Errors raised by this module | Y | - | |
| Connection – The connection object | close – close the connection | Y | - | 
| commit – commit the connection | Y | - | |
| rollback – roll back the connection | Y | - | |
| cursor – return a new cursor object | Y | - | |
| Attributes that are not part of the standard | Y | - | |
| Cursor – The cursor object | description – details regarding the result columns | Y | - | 
| rowcount – number of rows of the result | Y | - | |
| close – close the cursor | Y | - | |
| execute – execute a database operation | Y | - | |
| executemany – execute many similar database operations | Y | - | |
| callproc – Call a stored procedure | Y | - | |
| fetchone – fetch next row of the query result | Y | - | |
| fetchmany – fetch next set of rows of the query result | Y | - | |
| fetchall – fetch all rows of the query result | Y | - | |
| arraysize - the number of rows to fetch at a time | Y | - | |
| Methods and attributes that are not part of the standard | Y | - | |
| Type – Type objects and constructors | Type constructors | Y | - | 
| Type objects | Y | - | |
在Linux环境使用PyGreSQL第三方库连接集群
- 以root用户登录Linux环境。
- 执行以下命令创建python_dws.py文件。
     
     1vi python_dws.py 请复制粘贴以下内容放入python_dws.py文件中: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 #!/usr/bin/env python3 # _*_ encoding:utf-8 _*_ from __future__ import print_function import pg def create_table(connection): print("Begin to create table") try: connection.query("drop table if exists test;" "create table test(id int, name text);") except pg.InternalError as e: print(e) else: print("Table created successfully") def insert_data(connection): print("Begin to insert data") try: connection.query("insert into test values(1,'number1');") connection.query("insert into test values(2,'number2');") connection.query("insert into test values(3,'number3');") except pg.InternalError as e: print(e) else: print("Insert data successfully") def update_data(connection): print("Begin to update data") try: result = connection.query("update test set name = 'numberupdated' where id=1;") print("Total number of rows updated :", result) result = connection.query("select * from test order by 1;") rows = result.getresult() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: result = connection.query("delete from test where id=3;") print("Total number of rows deleted :", result) result = connection.query("select * from test order by 1;") rows = result.getresult() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: result = connection.query("select * from test order by 1;") rows = result.getresult() for row in rows: print("id = ", row[0]) print("name = ", row[1]) except pg.InternalError as e: print(e) print("select failed") else: print("Operation done successfully") if __name__ == '__main__': try: conn = pg.DB(host='10.154.70.231', port=8000, dbname='gaussdb', # 需要连接的database user='dbadmin', passwd='password') # 数据库用户密码 except pg.InternalError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close() 或使用dbapi接口实现: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import pg import pgdb def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except pg.InternalError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except pg.InternalError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = pgdb.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 except pg.InternalError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close() 
- 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
     
       PyGreSQL接口不提供重试连接的能力,您需要在业务代码中实现重试处理。 1 2 3 4 5 conn = pgdb.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 
- 执行以下命令,使用PyGreSQL第三方库连接集群。
     
     1python python_dws.py 
在Windows环境使用PyGreSQL第三方库连接集群
- 在Windows系统中,单击“开始”按钮 ,在搜索框中,键入cmd,然后在结果列表中单击“cmd.exe”打开命令提示符窗口。
- 在命令提示符窗口中,执行以下命令创建python_dws.py文件。
     
     1type nul> python_dws.py 请复制粘贴以下内容放入python_dws.py文件中: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 #!/usr/bin/env python3 # _*_ encoding:utf-8 _*_ from __future__ import print_function import pg def create_table(connection): print("Begin to create table") try: connection.query("drop table if exists test;" "create table test(id int, name text);") except pg.InternalError as e: print(e) else: print("Table created successfully") def insert_data(connection): print("Begin to insert data") try: connection.query("insert into test values(1,'number1');") connection.query("insert into test values(2,'number2');") connection.query("insert into test values(3,'number3');") except pg.InternalError as e: print(e) else: print("Insert data successfully") def update_data(connection): print("Begin to update data") try: result = connection.query("update test set name = 'numberupdated' where id=1;") print("Total number of rows updated :", result) result = connection.query("select * from test order by 1;") rows = result.getresult() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: result = connection.query("delete from test where id=3;") print("Total number of rows deleted :", result) result = connection.query("select * from test order by 1;") rows = result.getresult() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: result = connection.query("select * from test order by 1;") rows = result.getresult() for row in rows: print("id = ", row[0]) print("name = ", row[1]) except pg.InternalError as e: print(e) print("select failed") else: print("Operation done successfully") if __name__ == '__main__': try: conn = pg.DB(host='10.154.70.231', port=8000, dbname='gaussdb', # 需要连接的database user='dbadmin', passwd='password') # 数据库用户密码 except pg.InternalError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close() 或使用dbapi接口实现: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import pg import pgdb def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except pg.InternalError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except pg.InternalError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except pg.InternalError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = pgdb.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 except pg.InternalError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close() 
- 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
     
     PyGreSQL接口不提供重试连接的能力,您需要在业务代码中实现重试处理。 1 2 3 4 5 conn = pgdb.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 
- 执行以下命令,使用PyGreSQL第三方库连接集群。
     
     1python python_dws.py 
 
  