Updated on 2025-08-22 GMT+08:00

Sample Code for Accessing RDS for MySQL

Sample Code

This sample code queries the first 10 records from the user table in the RDS for MySQL instance database. The code can efficiently and reliably perform database operations by using the database connection pool and retry mechanism.

The following shows the complete sample code. For details about the code of the connection pool and retry mechanism, see Sample Code Interpretation.

import pymysql
import time
from functools import wraps
from DBUtils.PooledDB import PooledDB

db_client = None
POOL_CONFIG = { # Connection pool configuration
        'max_connections': 5,           # Maximum number of connections
        'keepalive_interval': 60,       # Connection keepalive interval (s)
        'max_retries': 3,               # Maximum number of retries
        'retry_delay': 1                # Retry interval (s)
}

def initializer(context):
    global db_client
    user = context.getUserData('username') 
    password = context.getUserData('password') 
    host = context.getUserData('host') 
    port = int(context.getUserData('port')) 
    database = context.getUserData('database') 

    dbConfig = { # MySQL database configuration
        'host': host,
        'port': port,
        'user': user,
        'password': password,
        'database': database,
        'charset': 'utf8',
    }
    db_client = Database(context, POOL_CONFIG, dbConfig)

def handler(event, context): # Handler
    logger = context.getLogger()

    try:
        result= db_client.query("SELECT * FROM user LIMIT 10")
    except Exception as e:
        logger.info("query database error:%s" % e)
        return {"code": 400, "errorMsg": "internal error %s" % e}

    return result

class MySQLConnectionPool:
    def __init__(self, context, pool_config, db_config):
        """
        Initialize the database connection pool
        :param db_config: database configuration
        :param pool_config: connection pool configuration
        """
        self.context = context
        self.logger = context.getLogger();

        self.db_config = db_config
        self.pool_config = pool_config
        self.pool = self._create_pool()
        self.last_keepalive_time = 0

    def _create_pool(self):
        """
        Create a connection pool
        :return: connection pool object
        """
        try:
            pool = PooledDB(
                creator=pymysql,
                maxconnections=self.pool_config['max_connections'],
                mincached=1,
                **self.db_config
            )
            return pool
        except Exception as e:
            self.logger.error(f"Failed to create connection pool: {e}")
            raise

    def _get_connection(self):
        """
        Obtain a connection from the connection pool and ensure that the connection is valid
        :return: database connection object
        """
        conn = self.pool.connection()
        if not self._is_connection_alive(conn):
            conn = self.pool.connection()
        return conn

    def _is_connection_alive(self, conn):
        """
        Check whether the connection is alive
        :param conn: database connection object
        :return: bool
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
                return True
        except Exception as e:
            self.logger.warning(f"Connection is not alive: {e}")
            return False

    def _close_connection(self, conn):
        """
        Closes the connection
        :param conn: database connection object
        """
        try:
            conn.close()
            self.logger.info("Connection closed")
        except Exception as e:
            self.logger.error(f"Failed to close connection: {e}")

    def _execute_query(self, conn, sql, params=None):
        """
        Query the database
        :param conn: database connection object
        :param sql: SQL statement
        :param params: SQL parameter
        :return: query result
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                if sql.strip().lower().startswith('select'):
                    return cursor.fetchall()
                return None
        except Exception as e:
            self.logger.error(f"Query failed: {e}")
            raise

    def _execute_write(self, conn, sql, params=None):
        """
        Perform write operations (insert, update, and delete)
        :param conn: database connection object
        :param sql: SQL statement
        :param params: SQL parameter
        :return: number of affected rows
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                conn.commit()
                return cursor.rowcount
        except Exception as e:
            self.logger.error(f"Write operation failed: {e}")
            conn.rollback()
            raise

def retry(max_retries=3, retry_delay=1):
    """
    Retry decorator
    :param max_retries: Maximum retries
    :param retry_delay: Retry interval (s)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"Attempt {retries + 1} failed: {e}")
                    if retries < max_retries - 1:
                        time.sleep(retry_delay)
                    retries += 1
            print(f"Failed after {max_retries} attempts")
            raise
        return wrapper
    return decorator

class Database:
    def __init__(self, context, pool_config, db_config):
        self.pool_config = pool_config
        self.db_config = db_config
        self.pool = MySQLConnectionPool(context, pool_config, db_config)

    @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
    def query(self, sql, params=None):
        """
        Perform the query operation
        :param sql: SQL statement
        :param params: SQL parameter
        :return: query result
        """
        conn = self.pool._get_connection()
        result = self.pool._execute_query(conn, sql, params)
        return result

    @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
    def execute(self, sql, params=None):
        """
        Perform write operations (insert, update, and delete)
        :param sql: SQL statement
        :param params: SQL parameter
        :return: number of affected rows
        """
        conn = self.pool._get_connection()
        result = self.pool._execute_write(conn, sql, params)
        return result