Help Center/ GaussDB(DWS)/ More Documents/ User Guide (Paris Region)/ Cluster Connection/ Using the Third-Party Function Library psycopg2 of Python to Connect to a Cluster
Updated on 2024-06-11 GMT+08:00

Using the Third-Party Function Library psycopg2 of Python to Connect to a Cluster

After creating a data warehouse cluster and using the third-party function library psycopg2 to connect to the cluster, you can use Python to access GaussDB(DWS) and perform various operations on data tables.

Preparations Before Connecting to a Cluster

  • An EIP has been bound to the data warehouse cluster.
  • You have obtained the administrator username and password for logging in to the database in the data warehouse cluster.
    MD5 algorithms may by vulnerable to collision attacks and cannot be used for password verification. Currently, GaussDB(DWS) uses the default security design. By default, MD5 password verification is disabled, and this may cause failures of connections from open source clients. You are advised to set password_encryption_type to 1. For details, see "Modifying Database Parameters" in User Guide.
    • For security purposes, GaussDB(DWS) no longer uses MD5 to store password digests by default. As a result, the open-source drives and clients may fail to connect to the database. To use the MD5 algorithm used in an open-source protocol, you must modify your password policy and create a new user, or change the password of an existing user.
    • The database stores the hash digest of passwords instead of password text. During password verification, the system compares the hash digest with the password digest sent from the client (salt operations are involved). If you change your cryptographic algorithm policy, the database cannot generate a new hash digest for your existing password. For connectivity purposes, you must manually change your password or create a new user. The new password will be encrypted using the hash algorithm and stored for authentication in the next connection.
  • You have obtained the public network address, including the IP address and port number in the data warehouse cluster. For details, see Obtaining the Cluster Connection Address.
  • You have installed the third-party function library psycopg2. Download address: https://pypi.org/project/psycopg2/. For details about installation and deployment, see https://www.psycopg.org/install/.
    • In CentOS and Red Hat OS, run the following yum command:
      1
      yum install python-psycopg2
      
    • psycopg2 depends on the libpq dynamic library of PostgreSQL (32-bit or 64-bit version, whichever matches the psycopg2 bit version). In Linux, you can run the yum command and do not need to install the library. Before using psycopg2 in Windows, you need to install libpq in either of the following ways:
      • Install PostgreSQL and configure the libpq, ssl, and crypto dynamic libraries in the environment variable PATH.
      • Install psqlodbc and use the libpq, ssl, and crypto dynamic libraries carried by the PostgreSQL ODBC driver.

Constraints

psycopg2 is a PostgreSQL-based client interface, and its functions are not fully supported by GaussDB(DWS). For details, see Table 1.

The following APIs are supported based on Python 3.8.5 and psycopg 2.9.1.

Table 1 psycopg2 APIs supported by DWS

Class Name

Usage

Function/Member Variable

Yes

Remarks

connections

basic

cursor(name=None, cursor_factory=None, scrollable=None, withhold=False)

Y

-

commit()

Y

-

rollback()

Y

-

close()

Y

-

Two-phase commit support methods

xid(format_id, gtrid, bqual)

Y

-

tpc_begin(xid)

Y

-

tpc_prepare()

N

The kernel does not support explicit PREPARE TRANSACTION.

tpc_commit([xid])

Y

-

tpc_rollback([xid])

Y

-

tpc_recover()

Y

-

closed

Y

-

cancel()

Y

-

reset()

N

DISCARD ALL is not supported.

dsn

Y

-

Transaction control methods and attributes.

set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=None)

Y

The database does not support the setting of default_transaction_read_only in a session.

autocommit

Y

-

isolation_level

Y

-

readonly

N

The database does not support the setting of default_transaction_read_only in a session.

deferrable

Y

-

set_isolation_level(level)

Y

-

encoding

Y

-

set_client_encoding(enc)

Y

-

notices

N

The database does not support listen/notify.

notifies

Y

-

cursor_factory

Y

-

info

Y

-

status

Y

-

lobject

N

The database does not support operations related to large objects.

Methods related to asynchronous support

poll()

Y

-

fileno()

Y

-

isexecuting()

Y

-

Interoperation with other C API modules

pgconn_ptr

Y

-

get_native_connection()

Y

-

informative methods of the native connection

get_transaction_status()

Y

-

protocol_version

Y

-

server_version

Y

-

get_backend_pid()

Y

The obtained PID is not the background PID, but the ID of the logical connection.

get_parameter_status(parameter)

Y

-

get_dsn_parameters()

Y

-

cursor

basic

description

Y

-

close()

Y

-

closed

Y

-

connection

Y

-

name

Y

-

scrollable

N

The database does not support SCROLL CURSOR.

withhold

N

The withhold cursor needs to be closed before the commit operation.

Commands execution methods

execute(query, vars=None)

Y

-

executemany(query, vars_list)

Y

-

callproc(procname[, parameters])

Y

-

mogrify(operation[, parameters])

Y

-

setinputsizes(sizes)

Y

-

fetchone()

Y

-

fetchmany([size=cursor.arraysize])

Y

-

fetchall()

Y

-

scroll(value[, mode='relative'])

N

The database does not support SCROLL CURSOR.

arraysize

Y

-

itersize

Y

-

rowcount

Y

-

rownumber

Y

-

lastrowid

Y

-

query

Y

-

statusmessage

Y

-

cast(oid, s)

Y

-

tzinfo_factory

Y

-

nextset()

Y

-

setoutputsize(size[, column])

Y

-

COPY-related methods

copy_from(file, table, sep='\\t', null='\\\\N', size=8192, columns=None)

Y

-

copy_to(file, table, sep='\\t', null='\\\\N', columns=None)

Y

-

copy_expert(sql, file, size=8192)

Y

-

Interoperation with other C API modules

pgresult_ptr

Y

-

Using the Third-Party Function Library psycopg2 to Connect to a Cluster (Linux)

  1. Log in to the Linux environment as user root.
  2. Run the following command to create the python_dws.py file:

    vi python_dws.py

    Copy and paste the following content to the python_dws.py file:

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
     
    from __future__ import print_function
     
    import psycopg2
     
     
    def create_table(connection):
        print("Begin to create table")
        try:
            cursor = connection.cursor()
            cursor.execute("drop table if exists test;"
                           "create table test(id int, name text);")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Table created successfully")
            cursor.close()
     
     
    def insert_data(connection):
        print("Begin to insert data")
        try:
            cursor = connection.cursor()
            cursor.execute("insert into test values(1,'number1');")
            cursor.execute("insert into test values(2,'number2');")
            cursor.execute("insert into test values(3,'number3');")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Insert data successfully")
            cursor.close()
     
     
    def update_data(connection):
        print("Begin to update data")
        try:
            cursor = connection.cursor()
            cursor.execute("update test set name = 'numberupdated' where id=1;")
            connection.commit()
            print("Total number of rows updated :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Update, Operation done successfully")
     
     
    def delete_data(connection):
        print("Begin to delete data")
        try:
            cursor = connection.cursor()
            cursor.execute("delete from test where id=3;")
            connection.commit()
            print("Total number of rows deleted :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Delete,Operation done successfully")
     
     
    def select_data(connection):
        print("Begin to select data")
        try:
            cursor = connection.cursor()
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
            print("select failed")
        else:
            print("Operation done successfully")
            cursor.close()
     
     
    if __name__ == '__main__':
        try:
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # Database to be connected
                                    user='dbadmin',
                                    password='password')  # Database user password
        except psycopg2.DatabaseError as ex:
            print(ex)
            print("Connect database failed")
        else:
            print("Opened database successfully")
            create_table(conn)
            insert_data(conn)
            select_data(conn)
            update_data(conn)
            delete_data(conn)
            conn.close()
    

  3. Change the public network address, cluster port number, database name, database username, and database password in the python_dws.py file based on the actual cluster information.

    The psycopg2 API does not provide the connection retry capability. You need to implement the retry processing in the service code.

    1
    2
    3
    4
    5
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # Database to be connected
                                    user='dbadmin',
                                    password='password')  # Database user password
    

  4. Run the following command to connect to the cluster using the third-party function library psycopg:

    python python_dws.py

Using the Third-Party Function Library psycopg2 to Connect to a Cluster (Windows)

  1. In the Windows operating system, click the Start button, enter cmd in the search box, and click cmd.exe in the result list to open the command-line interface (CLI).
  2. In the CLI, run the following command to create the python_dws.py file:

    type nul> python_dws.py

    Copy and paste the following content to the python_dws.py file:

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    #!/usr/bin/python
    # -*- coding:UTF-8 -*-
    
    from __future__ import print_function
    
    import psycopg2
    
    
    def create_table(connection):
        print("Begin to create table")
        try:
            cursor = connection.cursor()
            cursor.execute("drop table if exists test;"
                           "create table test(id int, name text);")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Table created successfully")
            cursor.close()
    
    
    def insert_data(connection):
        print("Begin to insert data")
        try:
            cursor = connection.cursor()
            cursor.execute("insert into test values(1,'number1');")
            cursor.execute("insert into test values(2,'number2');")
            cursor.execute("insert into test values(3,'number3');")
            connection.commit()
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("Insert data successfully")
            cursor.close()
    
    
    def update_data(connection):
        print("Begin to update data")
        try:
            cursor = connection.cursor()
            cursor.execute("update test set name = 'numberupdated' where id=1;")
            connection.commit()
            print("Total number of rows updated :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Update, Operation done successfully")
    
    
    def delete_data(connection):
        print("Begin to delete data")
        try:
            cursor = connection.cursor()
            cursor.execute("delete from test where id=3;")
            connection.commit()
            print("Total number of rows deleted :", cursor.rowcount)
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
        else:
            print("After Delete,Operation done successfully")
    
    
    def select_data(connection):
        print("Begin to select data")
        try:
            cursor = connection.cursor()
            cursor.execute("select * from test order by 1;")
            rows = cursor.fetchall()
            for row in rows:
                print("id = ", row[0])
                print("name = ", row[1], "\n")
        except psycopg2.ProgrammingError as e:
            print(e)
            print("select failed")
        else:
            print("Operation done successfully")
            cursor.close()
    
    
    if __name__ == '__main__':
        try:
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='postgresgaussdb',  # Database to be connected
                                    user='dbadmin',
                                    password='password')  # Database user password
        except psycopg2.DatabaseError as ex:
            print(ex)
            print("Connect database failed")
        else:
            print("Opened database successfully")
            create_table(conn)
            insert_data(conn)
            select_data(conn)
            update_data(conn)
            delete_data(conn)
            conn.close()
    

  3. Change the public network address, cluster port number, database name, database username, and database password in the python_dws.py file based on the actual cluster information.

    1
    2
    3
    4
    5
            conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # Database to be connected
                                    user='dbadmin',
                                    password='password')  # Database user password
    

  4. On the CLI, run the following command to use psycopg to connect to the cluster:

    python python_dws.py

Why CN Retry Is Not Supported When psycopg2 Is Connected to a Cluster?

With the CN retry feature, GaussDB(DWS) retries a statement that failed to be executed and identifies the failure type. However, in a session connected using psycopg2, a failed SQL statement will report an error and stop to be executed. In a primary/standby switchover, if a failed SQL statement is not retried, the following error will be reported. If the switchover is complete during an automatic retry, the correct result will be returned.

1
psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress

Error causes:

  1. psycopg2 sends the BEGIN statement to start a transaction before sending an SQL statement.
  2. CN retry does not support statements in transaction blocks.

Solution:

  • In synchronous connection mode, end the transaction started by the driver.
    1
    2
    3
    4
    cursor = conn.cursor()
    # End the transaction started by the driver.
    cursor.execute("end; select * from test order by 1;") 
    rows = cursor.fetchall()
    
  • Start a transaction in an asynchronous connection. For details, visit the PyScopg official website at: https://www.psycopg.org/docs/advanced.html?highlight=async
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    #!/usr/bin/env python3
    # _*_ encoding=utf-8 _*_
     
    import psycopg2
    import select
     
    # Wait function provided by psycopg2 in asynchronous connection mode
    #For details, see https://www.psycopg.org/docs/advanced.html?highlight=async.
    def wait(conn):
        while True:
            state = conn.poll()
            if state == psycopg2.extensions.POLL_OK:
                break
            elif state == psycopg2.extensions.POLL_WRITE:
                select.select([], [conn.fileno()], [])
            elif state == psycopg2.extensions.POLL_READ:
                select.select([conn.fileno()], [], [])
            else:
                raise psycopg2.OperationalError("poll() returned %s" % state)
     
    def psycopg2_cnretry_sync():
        # Create a connection.
        conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # Database to be connected
                                    user='dbadmin',
                                    password='password',  # Database user password
                                    async=1) # Use the asynchronous connection mode.
        wait(conn)
     
        # Execute a query.
        cursor = conn.cursor()
        cursor.execute("select * from test order by 1;")
        wait(conn)
        rows = cursor.fetchall()
        for row in rows:
            print(row[0], row[1])
     
        # Close the connection.
        conn.close()
     
    if __name__ == '__main__':
        psycopg2_cnretry_async()