Uso da biblioteca de funções de terceiros psycopg2 do Python para conectar-se a um cluster
Depois de criar um cluster de armazém de dados e usar a biblioteca de funções de terceiros psycopg2 para se conectar ao cluster, você pode usar o Python para acessar o GaussDB(DWS) e executar várias operações em tabelas de dados
Preparações antes de se conectar a um cluster
- Um EIP foi vinculado ao cluster de armazém de dados.
- Você obteve o nome de usuário e a senha de administrador para efetuar logon no banco de dados no cluster de armazém de dados.
Os algoritmos MD5 podem ser vulneráveis a ataques de colisão e não podem ser usados para verificação de senhas. Atualmente, o GaussDB(DWS) usa o design de segurança padrão. Por padrão, a verificação de senha MD5 está desabilitada, e isso pode causar falhas de conexões de clientes de código aberto. É aconselhável definir password_encryption_type como 1. Para obter detalhes, consulteModificação dos parâmetros do banco de dados.
- Por motivos de segurança, o GaussDB(DWS) não usa mais MD5 para armazenar resumos de senha por padrão. Como resultado, as unidades de código aberto e os clientes podem falhar ao se conectar ao banco de dados. Para usar o algoritmo MD5 usado em um protocolo de código aberto, você deve modificar sua política de senha e criar um novo usuário ou alterar a senha de um usuário existente.
- O banco de dados armazena o resumo de hash de senhas em vez de texto de senha. Durante a verificação de senha, o sistema compara o resumo de hash com o resumo de senha enviado pelo cliente (operações de sal estão envolvidas). Se você alterar sua política de algoritmo criptográfico, o banco de dados não poderá gerar um novo resumo de hash para sua senha existente. Para fins de conectividade, você deve alterar manualmente sua senha ou criar um novo usuário. A nova senha será criptografada usando o algoritmo de hash e armazenada para autenticação na próxima conexão.
- Você obteve o endereço de rede pública, incluindo o endereço IP e o número da porta no cluster de armazém de dados. Para mais detalhes, consulte Obtenção do endereço de conexão do cluster.
- Você instalou a biblioteca de funções de terceiros psycopg2. Endereço de download: https://pypi.org/project/psycopg2/. Para obter detalhes sobre instalação e implementação, consulte https://www.psycopg.org/install/.
- No CentOS e Red Hat OS, execute o seguinte comando yum:
1
yum install python-psycopg2
- psycopg2 depende da biblioteca dinâmica libpq do PostgreSQL (versão de 32 bits ou 64 bits, o que corresponder à versão de psycopg2). No Linux, você pode executar o comando yum e não precisa instalar a biblioteca. Antes de usar psycopg2 no Windows, você precisa instalar libpq de uma das seguintes maneiras:
- Instale o PostgreSQL e configure as bibliotecas dinâmicas libpq, ssl e crypto na variável de ambiente PATH.
- Instale psqlodbc e use as bibliotecas dinâmicas libpq, ssl e crypto transportadas pelo driver ODBC do PostgreSQL.
- No CentOS e Red Hat OS, execute o seguinte comando yum:
Restrições
psycopg2 é uma interface cliente baseada em PostgreSQL, e suas funções não são totalmente suportadas pelo GaussDB(DWS). Para mais detalhes, consulte Tabela 1.
As seguintes APIs são suportadas com base em Python 3.8.5 e psycopg 2.9.1.
Nome da classe |
Uso |
Função/variável de membro |
Sim |
Observações |
---|---|---|---|---|
connections |
Básico |
cursor(name=None, cursor_factory=None, scrollable=None, withhold=False) |
Sim |
- |
commit() |
Sim |
- |
||
rollback() |
Sim |
- |
||
close() |
Sim |
- |
||
Métodos de suporte de commit de duas fases |
xid(format_id, gtrid, bqual) |
Sim |
- |
|
tpc_begin(xid) |
Sim |
- |
||
tpc_prepare() |
Não |
O kernel não suporta PREPARE TRANSACTION explícita. |
||
tpc_commit([xid]) |
Sim |
- |
||
tpc_rollback([xid]) |
Sim |
- |
||
tpc_recover() |
Sim |
- |
||
closed |
Sim |
- |
||
cancel() |
Sim |
- |
||
reset() |
Não |
DISCARD ALL não é suportado. |
||
dsn |
Sim |
- |
||
Métodos e atributos de controle de transações. |
set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=None) |
Sim |
O banco de dados não suporta a configuração de default_transaction_read_only em uma sessão. |
|
autocommit |
Sim |
- |
||
isolation_level |
Sim |
- |
||
readonly |
Não |
O banco de dados não suporta a configuração de default_transaction_read_only em uma sessão. |
||
deferrable |
Sim |
- |
||
set_isolation_level(level) |
Sim |
- |
||
encoding |
Sim |
- |
||
set_client_encoding(enc) |
Sim |
- |
||
notices |
Não |
O banco de dados não suporta listen/notify. |
||
notifies |
Sim |
- |
||
cursor_factory |
S |
- |
||
info |
Sim |
- |
||
status |
Sim |
- |
||
lobject |
Não |
O banco de dados não suporta operações relacionadas a objetos grandes. |
||
Métodos relacionados ao suporte assíncrono |
poll() |
Sim |
- |
|
fileno() |
Sim |
- |
||
isexecuting() |
Sim |
- |
||
Inter-operação com outros módulos C API |
pgconn_ptr |
Sim |
- |
|
get_native_connection() |
Sim |
- |
||
Métodos informativos da conexão nativa |
get_transaction_status() |
Sim |
- |
|
protocol_version |
Sim |
- |
||
server_version |
Sim |
- |
||
get_backend_pid() |
Sim |
O PID obtido não é o PID em segundo plano, mas o ID da conexão lógica. |
||
get_parameter_status(parameter) |
Sim |
- |
||
get_dsn_parameters() |
Sim |
- |
||
cursor |
Básico |
description |
Sim |
- |
close() |
Sim |
- |
||
closed |
Sim |
- |
||
connection |
Sim |
- |
||
name |
Sim |
- |
||
scrollable |
Não |
O banco de dados não suporta SCROLL CURSOR. |
||
withhold |
Não |
O cwithhold cursor precisa ser fechado antes da operação de commit. |
||
Métodos de execução de comandos |
execute(query, vars=None) |
Sim |
- |
|
executemany(query, vars_list) |
Sim |
- |
||
callproc(procname[, parameters]) |
Sim |
- |
||
mogrify(operation[, parameters]) |
Sim |
- |
||
setinputsizes(sizes) |
Sim |
- |
||
fetchone() |
Sim |
- |
||
fetchmany([size=cursor.arraysize]) |
Sim |
- |
||
fetchall() |
Sim |
- |
||
scroll(value[, mode='relative']) |
Não |
O banco de dados não suporta SCROLL CURSOR. |
||
arraysize |
Sim |
- |
||
itersize |
Sim |
- |
||
rowcount |
Sim |
- |
||
rownumber |
Sim |
- |
||
lastrowid |
Sim |
- |
||
query |
Sim |
- |
||
statusmessage |
Sim |
- |
||
cast(oid, s) |
Sim |
- |
||
tzinfo_factory |
Sim |
- |
||
nextset() |
Sim |
- |
||
setoutputsize(size[, column]) |
Sim |
- |
||
Métodos relacionados à COPY |
copy_from(file, table, sep='\\t', null='\\\\N', size=8192, columns=None) |
Sim |
- |
|
copy_to(file, table, sep='\\t', null='\\\\N', columns=None) |
Sim |
- |
||
copy_expert(sql, file, size=8192) |
Sim |
- |
||
Inter-operação com outros módulos C API |
pgresult_ptr |
Sim |
- |
Usar a biblioteca de funções de terceiros psycopg2 para conectar-se a um cluster (Linux)
- Efetue logon no ambiente Linux como usuário root.
- Execute o seguinte comando para criar o arquivo python_dws.py:
vi python_dws.py
Copie e cole o seguinte conteúdo no arquivo python_dws.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
#!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import psycopg2 def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # Database to be connected user='dbadmin', password='password') # Database user password except psycopg2.DatabaseError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close()
- Altere o endereço de rede pública, o número da porta do cluster, o nome do banco de dados, o nome do usuário do banco de dados e a senha do banco de dados no arquivo python_dws.py com base nas informações reais do cluster.
A API de psycopg2 não fornece o recurso de repetição de conexão. Você precisa implementar o processamento de nova tentativa no código de serviço.
1 2 3 4 5
conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # Database to be connected user='dbadmin', password='password') # Database user password
- Execute o seguinte comando para se conectar ao cluster usando a biblioteca de funções de terceiros psycopg:
python python_dws.py
Usar a biblioteca de funções de terceiros psycopg2 para conectar-se a um cluster (Windows)
- No sistema operacional Windows, clique no botão Start, digite cmd na caixa de pesquisa e clique em cmd.exe na lista de resultados para abrir a interface de linha de comando (CLI).
- Na CLI, execute o seguinte comando para criar o arquivo python_dws.py:
type nul> python_dws.py
Copie e cole o seguinte conteúdo no arquivo python_dws.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
#!/usr/bin/python # -*- coding:UTF-8 -*- from __future__ import print_function import psycopg2 def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = psycopg2.connect(host='10.154.70.231', port='8000', database='postgresgaussdb', # Database to be connected user='dbadmin', password='password') # Database user password except psycopg2.DatabaseError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close()
- Altere o endereço de rede pública, o número da porta do cluster, o nome do banco de dados, o nome do usuário do banco de dados e a senha do banco de dados no arquivo python_dws.py com base nas informações reais do cluster.
1 2 3 4 5
conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # Database to be connected user='dbadmin', password='password') # Database user password
- Na CLI, execute o seguinte comando para usar psycopg para se conectar ao cluster:
python python_dws.py
Por que a nova tentativa de CN não é apoiada quando psycopg2 é conectado a um cluster?
Com o recurso de repetição de CN, GaussDB(DWS) tenta novamente uma instrução que falhou ao ser executada e identifica o tipo de falha. Para obter detalhes, consulte Repetição automática em caso de erros de execução de instruções SQL. No entanto, em uma sessão conectada usando psycopg2, uma instrução SQL com falha relatará um erro e interromperá a execução. Em uma alternância primária/em espera, se uma instrução SQL com falha não for repetida, o seguinte erro será relatado. Se a alternância for concluída durante uma nova tentativa automática, o resultado correto será retornado.
1
|
psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress |
Causas do erro:
- psycopg2 envia a instrução BEGIN para iniciar uma transação antes de enviar uma instrução SQL.
- A repetição de CN não suporta declarações em blocos de transação.
Solução:
- No modo de conexão síncrona, termine a transação iniciada pelo driver.
1 2 3 4
cursor = conn.cursor() # End the transaction started by the driver. cursor.execute("end; select * from test order by 1;") rows = cursor.fetchall()
- Inicie uma transação em uma conexão assíncrona. Para mais detalhes, visite o site oficial do PyScopg em: https://www.psycopg.org/docs/advanced.html?highlight=async
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
#!/usr/bin/env python3 # _*_ encoding=utf-8 _*_ import psycopg2 import select # Wait function provided by psycopg2 in asynchronous connection mode #For details, see https://www.psycopg.org/docs/advanced.html?highlight=async. def wait(conn): while True: state = conn.poll() if state == psycopg2.extensions.POLL_OK: break elif state == psycopg2.extensions.POLL_WRITE: select.select([], [conn.fileno()], []) elif state == psycopg2.extensions.POLL_READ: select.select([conn.fileno()], [], []) else: raise psycopg2.OperationalError("poll() returned %s" % state) def psycopg2_cnretry_sync(): # Create a connection. conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # Database to be connected user='dbadmin', password='password', # Database user password async=1) # Use the asynchronous connection mode. wait(conn) # Execute a query. cursor = conn.cursor() cursor.execute("select * from test order by 1;") wait(conn) rows = cursor.fetchall() for row in rows: print(row[0], row[1]) # Close the connection. conn.close() if __name__ == '__main__': psycopg2_cnretry_async()