更新时间:2025-08-12 GMT+08:00
分享

函数访问RDS MySQL示例代码

函数访问RDS MySQL示例代码

这段示例代码实现了从RDS for MySQL实例数据库的user表中查询前10条记录的功能。通过使用数据库连接池和重试机制,代码能够高效且可靠地执行数据库操作。

以下为完整的函数示例代码。其中关于连接池和重试部分的代码解读请参考示例代码解读

import pymysql
import time
from functools import wraps
from DBUtils.PooledDB import PooledDB

db_client = None
POOL_CONFIG = { # 连接池配置
        'max_connections': 5,           # 最大连接数
        'keepalive_interval': 60,       # 连接保活间隔(秒)
        'max_retries': 3,               # 最大重试次数
        'retry_delay': 1                # 重试间隔(秒)
}

def initializer(context):
    global db_client
    user = context.getUserData('username') 
    password = context.getUserData('password') 
    host = context.getUserData('host') 
    port = int(context.getUserData('port')) 
    database = context.getUserData('database') 

    dbConfig = { # mysql数据库配置
        'host': host,
        'port': port,
        'user': user,
        'password': password,
        'database': database,
        'charset': 'utf8',
    }
    db_client = Database(context, POOL_CONFIG, dbConfig)

def handler(event, context):  # 执行入口
    logger = context.getLogger()

    try:
        result= db_client.query("SELECT * FROM user LIMIT 10")
    except Exception as e:
        logger.info("query database error:%s" % e)
        return {"code": 400, "errorMsg": "internal error %s" % e}

    return result

class MySQLConnectionPool:
    def __init__(self, context, pool_config, db_config):
        """
        初始化数据库连接池
        :param db_config: 数据库配置
        :param pool_config: 连接池配置
        """
        self.context = context
        self.logger = context.getLogger();

        self.db_config = db_config
        self.pool_config = pool_config
        self.pool = self._create_pool()
        self.last_keepalive_time = 0

    def _create_pool(self):
        """
        创建数据库连接池
        :return: 连接池对象
        """
        try:
            pool = PooledDB(
                creator=pymysql,
                maxconnections=self.pool_config['max_connections'],
                mincached=1,
                **self.db_config
            )
            return pool
        except Exception as e:
            self.logger.error(f"Failed to create connection pool: {e}")
            raise

    def _get_connection(self):
        """
        从连接池获取连接,并确保连接有效
        :return: 数据库连接对象
        """
        conn = self.pool.connection()
        if not self._is_connection_alive(conn):
            conn = self.pool.connection()
        return conn

    def _is_connection_alive(self, conn):
        """
        检查连接是否存活
        :param conn: 数据库连接对象
        :return: bool
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute("SELECT 1")
                return True
        except Exception as e:
            self.logger.warning(f"Connection is not alive: {e}")
            return False

    def _close_connection(self, conn):
        """
        关闭连接
        :param conn: 数据库连接对象
        """
        try:
            conn.close()
            self.logger.info("Connection closed")
        except Exception as e:
            self.logger.error(f"Failed to close connection: {e}")

    def _execute_query(self, conn, sql, params=None):
        """
        执行数据库查询
        :param conn: 数据库连接对象
        :param sql: SQL 语句
        :param params: SQL 参数
        :return: 查询结果
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                if sql.strip().lower().startswith('select'):
                    return cursor.fetchall()
                return None
        except Exception as e:
            self.logger.error(f"Query failed: {e}")
            raise

    def _execute_write(self, conn, sql, params=None):
        """
        执行写操作(插入、更新、删除)
        :param conn: 数据库连接对象
        :param sql: SQL 语句
        :param params: SQL 参数
        :return: 受影响的行数
        """
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                conn.commit()
                return cursor.rowcount
        except Exception as e:
            self.logger.error(f"Write operation failed: {e}")
            conn.rollback()
            raise

def retry(max_retries=3, retry_delay=1):
    """
    重试装饰器
    :param max_retries: 最大重试次数
    :param retry_delay: 重试间隔(秒)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"Attempt {retries + 1} failed: {e}")
                    if retries < max_retries - 1:
                        time.sleep(retry_delay)
                    retries += 1
            print(f"Failed after {max_retries} attempts")
            raise
        return wrapper
    return decorator

class Database:
    def __init__(self, context, pool_config, db_config):
        self.pool_config = pool_config
        self.db_config = db_config
        self.pool = MySQLConnectionPool(context, pool_config, db_config)

    @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
    def query(self, sql, params=None):
        """
        执行查询操作
        :param sql: SQL 语句
        :param params: SQL 参数
        :return: 查询结果
        """
        conn = self.pool._get_connection()
        result = self.pool._execute_query(conn, sql, params)
        return result

    @retry(max_retries=POOL_CONFIG['max_retries'], retry_delay=POOL_CONFIG['retry_delay'])
    def execute(self, sql, params=None):
        """
        执行写操作(插入、更新、删除)
        :param sql: SQL 语句
        :param params: SQL 参数
        :return: 受影响的行数
        """
        conn = self.pool._get_connection()
        result = self.pool._execute_write(conn, sql, params)
        return result

相关文档