文档首页/ 分布式缓存服务 DCS/ 最佳实践/ 业务应用/ 使用DCS进行消息发布与订阅
更新时间:2026-06-17 GMT+08:00
分享

使用DCS进行消息发布与订阅

业务场景

在生产环境中,通常会部署多套DCS实例(如 dcs-prod-01、dcs-prod-02、dcs-prod-03)。运维团队需要实时掌握各实例的运行状况,包括内存使用率、连接数、慢查询、主从复制状态、key 淘汰等指标。一旦出现异常,不同角色的终端需要按严重程度接收告警:DBA值班仅关注最紧急的问题,运维看板展示告警趋势,自动修复服务针对特定故障触发自愈操作,日志归档服务则记录全部级别的告警以供事后审计。

Redis的发布订阅(Pub/Sub)机制可以很好地解决该问题:监控Agent定期采集各实例的运行指标,发现异常后将告警发布到不同级别的频道(如redis:monitor:critical、redis:monitor:warning、redis:monitor:notice),各终端按需订阅即可实时收到消息。日志归档服务还可以通过模式匹配(psubscribe)一次性订阅所有redis:monitor:*频道。

Redis Pub/Sub中的消息“即发即失”,不会持久化存储;订阅端的listen()调用会阻塞当前线程,因此订阅端应独立运行或放在单独的线程中。

代码示例

使用Python Redis客户端redis-py连接Redis实例的方法,请参考Redis-py客户端连接Redis

消息发布者

def get_redis_connection():
    return redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
def monitor_agent():
    r = get_redis_connection()
    # 模拟一轮采集产生的告警事件
    alerts = [
        ("redis:monitor:notice", {
            "instance": "dcs-prod-01",
            "metric": "connected_clients",
            "value": 128,
            "threshold": 500,
            "msg": "当前连接数 128,运行正常"
        }),
        ("redis:monitor:warning", {
            "instance": "dcs-prod-02",
            "metric": "used_memory_mb",
            "value": 7680,
            "threshold": 8192,
            "pct": "93.7%",
            "msg": "内存使用率已达 93.7%,即将触及 maxmemory"
        }),
        ("redis:monitor:warning", {
            "instance": "dcs-prod-01",
            "metric": "slowlog_count",
            "value": 15,
            "threshold": 10,
            "window": "近5分钟",
            "msg": "慢查询数量 15 条,超过阈值 10"
        }),
        ("redis:monitor:critical", {
            "instance": "dcs-prod-01",
            "metric": "replication_status",
            "value": "down",
            "slave": "dcs-slave-01",
            "msg": "主从复制连接中断,slave dcs-slave-01 已断开"
        }),
        ("redis:monitor:notice", {
            "instance": "dcs-prod-03",
            "metric": "evicted_keys",
            "value": 320,
            "msg": "近1小时淘汰 key 320 个,已触发 maxmemory-policy"
        }),
        ("redis:monitor:critical", {
            "instance": "dcs-prod-02",
            "metric": "used_memory_mb",
            "value": 8190,
            "threshold": 8192,
            "pct": "99.97%",
            "msg": "内存已满,写入命令被拒绝"
        }),
        ("redis:monitor:warning", {
            "instance": "dcs-prod-03",
            "metric": "connected_clients",
            "value": 460,
            "threshold": 500,
            "pct": "92.0%",
            "msg": "连接数已达上限的 92%"
        }),
    ]
    for channel, data in alerts:
        payload = json.dumps(data, ensure_ascii=False)
        r.publish(channel, payload)
        level = channel.split(":")[-1].upper()
        six.print_("[监控Agent] {level:8s} | {instance} | {msg}".format(
            level=level, instance=data['instance'], msg=data['msg']))
        time.sleep(1)
    six.print_("[监控Agent] 本轮采集告警推送完毕")

消息订阅者

  1. DBA 值班终端订阅者。
    def dba_on_call_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.subscribe("redis:monitor:critical")
        six.print_("[DBA值班] 已订阅 critical 频道")
        for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                six.print_("[DBA值班] !!! 严重告警 !!! {instance} | {msg}".format(
                    instance=data['instance'], msg=data['msg']))
  2. 运维看板订阅者。
    def operations_dashboard_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.subscribe("redis:monitor:critical", "redis:monitor:warning")
        six.print_("[运维看板] 已订阅 critical + warning 频道")
        for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                level = message["channel"].split(":")[-1].upper()
                six.print_("[运维看板] [{level}] {instance} | {metric} | {msg}".format(
                    level=level, instance=data['instance'],
                    metric=data['metric'], msg=data['msg']))
  3. 自动修复服务订阅者。
    def auto_repair_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.subscribe("redis:monitor:critical")
        six.print_("[自愈服务] 已订阅 critical 频道")
        for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                if data["metric"] == "used_memory_mb":
                    print(f"[自愈服务] 检测到内存满 -> 触发自动清理: {data['instance']}")
                elif data["metric"] == "replication_status":
                    print(f"[自愈服务] 检测到复制中断 -> 尝试重连 slave: {data.get('slave', 'unknown')}")
                else:
                    print(f"[自愈服务] 收到告警: {data['instance']} | {data['msg']}")
  4. 日志归档服务订阅者。
    def log_archiver_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.psubscribe("redis:monitor:*")
        six.print_("[日志归档] 已模式订阅 redis:monitor:*")
        for message in pubsub.listen():
            if message["type"] == "pmessage":
                data = json.loads(message["data"])
                level = message["channel"].split(":")[-1].upper()
                six.print_("[日志归档] [{level:8s}] {instance} | {msg}".format(
                    level=level, instance=data['instance'], msg=data['msg']))
  5. 主函数示例。
    def main():
        # 创建订阅者线程
        dba_thread = threading.Thread(target=dba_on_call_subscriber)
        dashboard_thread = threading.Thread(target=operations_dashboard_subscriber)
        log_archiver_thread = threading.Thread(target=log_archiver_subscriber)
        auto_repair_thread = threading.Thread(target=auto_repair_subscriber)
    
        thread_list = [dba_thread, dashboard_thread, log_archiver_thread, auto_repair_thread]
    
        for sub_thread in thread_list:
            sub_thread.daemon = True
        # 启动所有订阅者线程
        for sub_thread in thread_list:
            sub_thread.start()
        # 启动监控 Agent
        monitor_agent()
        # 等待所有线程结束
        for sub_thread in thread_list:
            sub_thread.join(timeout=1)

运行结果

[DBA值班] 已订阅 critical 频道
[运维看板] 已订阅 critical + warning 频道
[日志归档] 已模式订阅 redis:monitor:*
[自愈服务] 已订阅 critical 频道
[监控Agent] NOTICE   | dcs-prod-01 | 当前连接数 128,运行正常
[日志归档] [NOTICE  ] dcs-prod-01 | 当前连接数 128,运行正常
[监控Agent] WARNING  | dcs-prod-02 | 内存使用率已达 93.7%,即将触及 maxmemory
[运维看板] [WARNING] dcs-prod-02 | used_memory_mb | 内存使用率已达 93.7%,即将触及 maxmemory
[日志归档] [WARNING ] dcs-prod-02 | 内存使用率已达 93.7%,即将触及 maxmemory
[监控Agent] WARNING  | dcs-prod-01 | 慢查询数量 15 条,超过阈值 10
[运维看板] [WARNING] dcs-prod-01 | slowlog_count | 慢查询数量 15 条,超过阈值 10
[日志归档] [WARNING ] dcs-prod-01 | 慢查询数量 15 条,超过阈值 10
[监控Agent] CRITICAL | dcs-prod-01 | 主从复制连接中断,slave dcs-slave-01 已断开
[日志归档] [CRITICAL] dcs-prod-01 | 主从复制连接中断,slave dcs-slave-01 已断开
[运维看板] [CRITICAL] dcs-prod-01 | replication_status | 主从复制连接中断,slave dcs-slave-01 已断开
[自愈服务] 检测到复制中断 -> 尝试重连 slave: dcs-slave-01
[DBA值班] !!! 严重告警 !!! dcs-prod-01 | 主从复制连接中断,slave dcs-slave-01 已断开
[监控Agent] NOTICE   | dcs-prod-03 | 近1小时淘汰 key 320 个,已触发 maxmemory-policy
[日志归档] [NOTICE  ] dcs-prod-03 | 近1小时淘汰 key 320 个,已触发 maxmemory-policy
[监控Agent] CRITICAL | dcs-prod-02 | 内存已满,写入命令被拒绝
[日志归档] [CRITICAL] dcs-prod-02 | 内存已满,写入命令被拒绝
[自愈服务] 检测到内存满 -> 触发自动清理: dcs-prod-02
[运维看板] [CRITICAL] dcs-prod-02 | used_memory_mb | 内存已满,写入命令被拒绝
[DBA值班] !!! 严重告警 !!! dcs-prod-02 | 内存已满,写入命令被拒绝
[监控Agent] WARNING  | dcs-prod-03 | 连接数已达上限的 92%
[日志归档] [WARNING ] dcs-prod-03 | 连接数已达上限的 92%
[运维看板] [WARNING] dcs-prod-03 | connected_clients | 连接数已达上限的 92%
[监控Agent] 本轮采集告警推送完毕

以上示例演示了一个监控 Agent 与四个不同角色订阅者的协作模式。通过频道分级、多频道订阅和模式匹配,可以灵活实现告警的精准路由和自动化响应。Redis Pub/Sub 适合监控告警、事件广播、状态同步等实时场景,但由于消息不持久化、无确认机制,不适合作为可靠的消息队列使用。如需消息可靠性保障,应考虑 Redis Stream 或专业消息队列中间件。

相关文档