函数访问RDS MySQL示例代码
函数访问RDS MySQL示例代码
这段示例代码实现了从RDS for MySQL实例数据库的user表中查询前10条记录的功能。通过使用数据库连接池和重试机制,代码能够高效且可靠地执行数据库操作。
以下为完整的函数示例代码。其中关于连接池和重试部分的代码解读请参考示例代码解读。
import pymysql import time from functools import wraps from DBUtils.PooledDB import PooledDB db_client = None POOL_CONFIG = { # 连接池配置 'max_connections': 5, # 最大连接数 'keepalive_interval': 60, # 连接保活间隔(秒) 'max_retries': 3, # 最大重试次数 'retry_delay': 1 # 重试间隔(秒) } def initializer(context): global db_client user = context.getUserData('username') password = context.getUserData('password') host = context.getUserData('host') port = int(context.getUserData('port')) database = context.getUserData('database') dbConfig = { # mysql数据库配置 'host': host, 'port': port, 'user': user, 'password': password, 'database': database, 'charset': 'utf8', } db_client = Database(context, POOL_CONFIG, dbConfig) def handler(event, context): # 执行入口 logger = context.getLogger() try: result= db_client.query("SELECT * FROM user LIMIT 10") except Exception as e: logger.info("query database error:%s" % e) return {"code": 400, "errorMsg": "internal error %s" % e} return result class MySQLConnectionPool: def __init__(self, context, pool_config, db_config): """ 初始化数据库连接池 :param db_config: 数据库配置 :param pool_config: 连接池配置 """ self.context = context self.logger = context.getLogger(); self.db_config = db_config self.pool_config = pool_config self.pool = self._create_pool() self.last_keepalive_time = 0 def _create_pool(self): """ 创建数据库连接池 :return: 连接池对象 """ try: pool = PooledDB( creator=pymysql, maxconnections=self.pool_config['max_connections'], mincached=1, **self.db_config ) return pool except Exception as e: self.logger.error(f"Failed to create connection pool: {e}") raise def _get_connection(self): """ 从连接池获取连接,并确保连接有效 :return: 数据库连接对象 """ conn = self.pool.connection() if not self._is_connection_alive(conn): conn = self.pool.connection() return conn def _is_connection_alive(self, conn): """ 检查连接是否存活 :param conn: 数据库连接对象 :return: bool """ try: with conn.cursor() as cursor: cursor.execute("SELECT 1") return True except Exception as e: self.logger.warning(f"Connection is not alive: {e}") return False def _close_connection(self, conn): """ 关闭连接 :param conn: 数据库连接对象 """ try: conn.close() self.logger.info("Connection closed") except Exception as e: self.logger.error(f"Failed to close connection: {e}") def _execute_query(self, conn, sql, params=None): """ 执行数据库查询 :param conn: 数据库连接对象 :param sql: SQL 语句 :param params: SQL 参数 :return: 查询结果 """ try: with conn.cursor() as cursor: cursor.execute(sql, params) if sql.strip().lower().startswith('select'): return cursor.fetchall() return None except Exception as e: self.logger.error(f"Query failed: {e}") raise def _execute_write(self, conn, sql, params=None): """ 执行写操作(插入、更新、删除) :param conn: 数据库连接对象 :param sql: SQL 语句 :param params: SQL 参数 :return: 受影响的行数 """ try: with conn.cursor() as cursor: cursor.execute(sql, params) conn.commit() return cursor.rowcount except Exception as e: self.logger.error(f"Write operation failed: {e}") conn.rollback() raise def retry(max_retries=3, retry_delay=1): """ 重试装饰器 :param max_retries: 最大重试次数 :param retry_delay: 重试间隔(秒) """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 while retries < max_retries: try: return func(*args, **kwargs) except Exception as e: print(f"Attempt {retries + 1} failed: {e}") if retries < max_retries - 1: time.sleep(retry_delay) retries += 1 print(f"Failed after {max_retries} attempts") raise return wrapper return decorator class Database: def __init__(self, context, pool_config, db_config): self.pool_config = pool_config self.db_config = db_config self.pool = MySQLConnectionPool(context, pool_config, db_config) @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay']) def query(self, sql, params=None): """ 执行查询操作 :param sql: SQL 语句 :param params: SQL 参数 :return: 查询结果 """ conn = self.pool._get_connection() result = self.pool._execute_query(conn, sql, params) return result @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay']) def execute(self, sql, params=None): """ 执行写操作(插入、更新、删除) :param sql: SQL 语句 :param params: SQL 参数 :return: 受影响的行数 """ conn = self.pool._get_connection() result = self.pool._execute_write(conn, sql, params) return result