Sample Code for Accessing RDS for MySQL
Sample Code
This sample code queries the first 10 records from the user table in the RDS for MySQL instance database. The code can efficiently and reliably perform database operations by using the database connection pool and retry mechanism.
The following shows the complete sample code. For details about the code of the connection pool and retry mechanism, see Sample Code Interpretation.
import pymysql
import time
from functools import wraps
from DBUtils.PooledDB import PooledDB
db_client = None
POOL_CONFIG = { # Connection pool configuration
'max_connections': 5, # Maximum number of connections
'keepalive_interval': 60, # Connection keepalive interval (s)
'max_retries': 3, # Maximum number of retries
'retry_delay': 1 # Retry interval (s)
}
def initializer(context):
global db_client
user = context.getUserData('username')
password = context.getUserData('password')
host = context.getUserData('host')
port = int(context.getUserData('port'))
database = context.getUserData('database')
dbConfig = { # MySQL database configuration
'host': host,
'port': port,
'user': user,
'password': password,
'database': database,
'charset': 'utf8',
}
db_client = Database(context, POOL_CONFIG, dbConfig)
def handler(event, context): # Handler
logger = context.getLogger()
try:
result= db_client.query("SELECT * FROM user LIMIT 10")
except Exception as e:
logger.info("query database error:%s" % e)
return {"code": 400, "errorMsg": "internal error %s" % e}
return result
class MySQLConnectionPool:
def __init__(self, context, pool_config, db_config):
"""
Initialize the database connection pool
:param db_config: database configuration
:param pool_config: connection pool configuration
"""
self.context = context
self.logger = context.getLogger();
self.db_config = db_config
self.pool_config = pool_config
self.pool = self._create_pool()
self.last_keepalive_time = 0
def _create_pool(self):
"""
Create a connection pool
:return: connection pool object
"""
try:
pool = PooledDB(
creator=pymysql,
maxconnections=self.pool_config['max_connections'],
mincached=1,
**self.db_config
)
return pool
except Exception as e:
self.logger.error(f"Failed to create connection pool: {e}")
raise
def _get_connection(self):
"""
Obtain a connection from the connection pool and ensure that the connection is valid
:return: database connection object
"""
conn = self.pool.connection()
if not self._is_connection_alive(conn):
conn = self.pool.connection()
return conn
def _is_connection_alive(self, conn):
"""
Check whether the connection is alive
:param conn: database connection object
:return: bool
"""
try:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
return True
except Exception as e:
self.logger.warning(f"Connection is not alive: {e}")
return False
def _close_connection(self, conn):
"""
Closes the connection
:param conn: database connection object
"""
try:
conn.close()
self.logger.info("Connection closed")
except Exception as e:
self.logger.error(f"Failed to close connection: {e}")
def _execute_query(self, conn, sql, params=None):
"""
Query the database
:param conn: database connection object
:param sql: SQL statement
:param params: SQL parameter
:return: query result
"""
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
if sql.strip().lower().startswith('select'):
return cursor.fetchall()
return None
except Exception as e:
self.logger.error(f"Query failed: {e}")
raise
def _execute_write(self, conn, sql, params=None):
"""
Perform write operations (insert, update, and delete)
:param conn: database connection object
:param sql: SQL statement
:param params: SQL parameter
:return: number of affected rows
"""
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount
except Exception as e:
self.logger.error(f"Write operation failed: {e}")
conn.rollback()
raise
def retry(max_retries=3, retry_delay=1):
"""
Retry decorator
:param max_retries: Maximum retries
:param retry_delay: Retry interval (s)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
print(f"Attempt {retries + 1} failed: {e}")
if retries < max_retries - 1:
time.sleep(retry_delay)
retries += 1
print(f"Failed after {max_retries} attempts")
raise
return wrapper
return decorator
class Database:
def __init__(self, context, pool_config, db_config):
self.pool_config = pool_config
self.db_config = db_config
self.pool = MySQLConnectionPool(context, pool_config, db_config)
@retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
def query(self, sql, params=None):
"""
Perform the query operation
:param sql: SQL statement
:param params: SQL parameter
:return: query result
"""
conn = self.pool._get_connection()
result = self.pool._execute_query(conn, sql, params)
return result
@retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
def execute(self, sql, params=None):
"""
Perform write operations (insert, update, and delete)
:param sql: SQL statement
:param params: SQL parameter
:return: number of affected rows
"""
conn = self.pool._get_connection()
result = self.pool._execute_write(conn, sql, params)
return result
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot