Sample Code for Accessing RDS for MySQL
Sample Code
This sample code queries the first 10 records from the user table in the RDS for MySQL instance database. The code can efficiently and reliably perform database operations by using the database connection pool and retry mechanism.
The following shows the complete sample code. For details about the code of the connection pool and retry mechanism, see Sample Code Interpretation.
import pymysql import time from functools import wraps from DBUtils.PooledDB import PooledDB db_client = None POOL_CONFIG = { # Connection pool configuration 'max_connections': 5, # Maximum number of connections 'keepalive_interval': 60, # Connection keepalive interval (s) 'max_retries': 3, # Maximum number of retries 'retry_delay': 1 # Retry interval (s) } def initializer(context): global db_client user = context.getUserData('username') password = context.getUserData('password') host = context.getUserData('host') port = int(context.getUserData('port')) database = context.getUserData('database') dbConfig = { # MySQL database configuration 'host': host, 'port': port, 'user': user, 'password': password, 'database': database, 'charset': 'utf8', } db_client = Database(context, POOL_CONFIG, dbConfig) def handler(event, context): # Handler logger = context.getLogger() try: result= db_client.query("SELECT * FROM user LIMIT 10") except Exception as e: logger.info("query database error:%s" % e) return {"code": 400, "errorMsg": "internal error %s" % e} return result class MySQLConnectionPool: def __init__(self, context, pool_config, db_config): """ Initialize the database connection pool :param db_config: database configuration :param pool_config: connection pool configuration """ self.context = context self.logger = context.getLogger(); self.db_config = db_config self.pool_config = pool_config self.pool = self._create_pool() self.last_keepalive_time = 0 def _create_pool(self): """ Create a connection pool :return: connection pool object """ try: pool = PooledDB( creator=pymysql, maxconnections=self.pool_config['max_connections'], mincached=1, **self.db_config ) return pool except Exception as e: self.logger.error(f"Failed to create connection pool: {e}") raise def _get_connection(self): """ Obtain a connection from the connection pool and ensure that the connection is valid :return: database connection object """ conn = self.pool.connection() if not self._is_connection_alive(conn): conn = self.pool.connection() return conn def _is_connection_alive(self, conn): """ Check whether the connection is alive :param conn: database connection object :return: bool """ try: with conn.cursor() as cursor: cursor.execute("SELECT 1") return True except Exception as e: self.logger.warning(f"Connection is not alive: {e}") return False def _close_connection(self, conn): """ Closes the connection :param conn: database connection object """ try: conn.close() self.logger.info("Connection closed") except Exception as e: self.logger.error(f"Failed to close connection: {e}") def _execute_query(self, conn, sql, params=None): """ Query the database :param conn: database connection object :param sql: SQL statement :param params: SQL parameter :return: query result """ try: with conn.cursor() as cursor: cursor.execute(sql, params) if sql.strip().lower().startswith('select'): return cursor.fetchall() return None except Exception as e: self.logger.error(f"Query failed: {e}") raise def _execute_write(self, conn, sql, params=None): """ Perform write operations (insert, update, and delete) :param conn: database connection object :param sql: SQL statement :param params: SQL parameter :return: number of affected rows """ try: with conn.cursor() as cursor: cursor.execute(sql, params) conn.commit() return cursor.rowcount except Exception as e: self.logger.error(f"Write operation failed: {e}") conn.rollback() raise def retry(max_retries=3, retry_delay=1): """ Retry decorator :param max_retries: Maximum retries :param retry_delay: Retry interval (s) """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 while retries < max_retries: try: return func(*args, **kwargs) except Exception as e: print(f"Attempt {retries + 1} failed: {e}") if retries < max_retries - 1: time.sleep(retry_delay) retries += 1 print(f"Failed after {max_retries} attempts") raise return wrapper return decorator class Database: def __init__(self, context, pool_config, db_config): self.pool_config = pool_config self.db_config = db_config self.pool = MySQLConnectionPool(context, pool_config, db_config) @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay']) def query(self, sql, params=None): """ Perform the query operation :param sql: SQL statement :param params: SQL parameter :return: query result """ conn = self.pool._get_connection() result = self.pool._execute_query(conn, sql, params) return result @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay']) def execute(self, sql, params=None): """ Perform write operations (insert, update, and delete) :param sql: SQL statement :param params: SQL parameter :return: number of affected rows """ conn = self.pool._get_connection() result = self.pool._execute_write(conn, sql, params) return result
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot