更新时间:2025-08-22 GMT+08:00
示例代码解读
连接池
在示例代码中基于DBUtils.PooledDB构建MySQL连接池(MySQLConnectionPool),并提供内置的连接探活机制,配置了最大连接数(max_connections)和连接探活间隔(keepalive_interval),代码片段如下:
POOL_CONFIG = { # 连接池配置
'max_connections': 5, # 最大连接数
'keepalive_interval': 60, # 连接保活间隔(秒)
'max_retries': 3, # 最大重试次数
'retry_delay': 1 # 重试间隔(秒)
}
class MySQLConnectionPool:
def __init__(self, context, pool_config, db_config):
"""
初始化数据库连接池
:param db_config: 数据库配置
:param pool_config: 连接池配置
"""
self.context = context
self.logger = context.getLogger();
self.db_config = db_config
self.pool_config = pool_config
self.pool = self._create_pool()
self.last_keepalive_time = 0
def _create_pool(self):
"""
创建数据库连接池
:return: 连接池对象
"""
try:
pool = PooledDB(
creator=pymysql,
maxconnections=self.pool_config['max_connections'],
mincached=1,
**self.db_config
)
return pool
except Exception as e:
self.logger.error(f"Failed to create connection pool: {e}")
raise
def _get_connection(self):
"""
从连接池获取连接,并确保连接有效
:return: 数据库连接对象
"""
conn = self.pool.connection()
if not self._is_connection_alive(conn):
conn = self.pool.connection()
return conn
def _is_connection_alive(self, conn):
"""
检查连接是否存活
:param conn: 数据库连接对象
:return: bool
"""
try:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
return True
except Exception as e:
self.logger.warning(f"Connection is not alive: {e}")
return False
def _close_connection(self, conn):
"""
关闭连接
:param conn: 数据库连接对象
"""
try:
conn.close()
self.logger.info("Connection closed")
except Exception as e:
self.logger.error(f"Failed to close connection: {e}")
def _execute_query(self, conn, sql, params=None):
"""
执行数据库查询
:param conn: 数据库连接对象
:param sql: SQL 语句
:param params: SQL 参数
:return: 查询结果
"""
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
if sql.strip().lower().startswith('select'):
return cursor.fetchall()
return None
except Exception as e:
self.logger.error(f"Query failed: {e}")
raise
def _execute_write(self, conn, sql, params=None):
"""
执行写操作(插入、更新、删除)
:param conn: 数据库连接对象
:param sql: SQL 语句
:param params: SQL 参数
:return: 受影响的行数
"""
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount
except Exception as e:
self.logger.error(f"Write operation failed: {e}")
conn.rollback()
raise
使用MySQL连接池复用已创建的连接,能有效提升程序性能。最大连接数配置确保连接资源的使用保持在可控范围内,同时确保线程安全。
相关概念说明:
|
概念 |
说明 |
|---|---|
|
最大连接数配置区间 |
在FunctionGraph函数配置Mysql最大连接数建议在如下区间选取一个值:
例如:某个访问MySQL函数单实例并发度配置为5,每次执行函数访问MySQL并发度为2,函数最大实例数默认400,访问的MySQL实例连接数上限为30000,则计算如下:
按上述结果,建议将最大连接数配置为50。 |
|
连接探活间隔 |
不要超过函数执行超时时间,避免因连接断开导致的问题。 |
重试
通过装饰器构建自动重试机制,确保在执行MySQL操作失败后重试特定次数,这样可以显著降低暂时性故障的影响。例如,在瞬时网络抖动、磁盘问题导致服务暂时不可用或调用超时的情况下,提高MySQL操作的成功率。
代码片段如下:
POOL_CONFIG = { # 连接池配置
'max_connections': 5, # 最大连接数
'keepalive_interval': 60, # 连接保活间隔(秒)
'max_retries': 3, # 最大重试次数
'retry_delay': 1 # 重试间隔(秒)
}
class Database:
def __init__(self, context, pool_config, db_config):
self.pool_config = pool_config
self.db_config = db_config
self.pool = MySQLConnectionPool(context, pool_config, db_config)
@retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
def query(self, sql, params=None):
"""
执行查询操作
:param sql: SQL 语句
:param params: SQL 参数
:return: 查询结果
"""
conn = self.pool._get_connection()
result = self.pool._execute_query(conn, sql, params)
return result
@retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
def execute(self, sql, params=None):
"""
执行写操作(插入、更新、删除)
:param sql: SQL 语句
:param params: SQL 参数
:return: 受影响的行数
"""
conn = self.pool._get_connection()
result = self.pool._execute_write(conn, sql, params)
return result
def retry(max_retries=3, retry_delay=1):
"""
重试装饰器
:param max_retries: 最大重试次数
:param retry_delay: 重试间隔(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
print(f"Attempt {retries + 1} failed: {e}")
if retries < max_retries - 1:
time.sleep(retry_delay)
retries += 1
print(f"Failed after {max_retries} attempts")
raise
return wrapper
return decorator