使用DCS进行消息发布与订阅
业务场景
在生产环境中,通常会部署多套DCS实例(如 dcs-prod-01、dcs-prod-02、dcs-prod-03)。运维团队需要实时掌握各实例的运行状况,包括内存使用率、连接数、慢查询、主从复制状态、key 淘汰等指标。一旦出现异常,不同角色的终端需要按严重程度接收告警:DBA值班仅关注最紧急的问题,运维看板展示告警趋势,自动修复服务针对特定故障触发自愈操作,日志归档服务则记录全部级别的告警以供事后审计。
Redis的发布订阅(Pub/Sub)机制可以很好地解决该问题:监控Agent定期采集各实例的运行指标,发现异常后将告警发布到不同级别的频道(如redis:monitor:critical、redis:monitor:warning、redis:monitor:notice),各终端按需订阅即可实时收到消息。日志归档服务还可以通过模式匹配(psubscribe)一次性订阅所有redis:monitor:*频道。
Redis Pub/Sub中的消息“即发即失”,不会持久化存储;订阅端的listen()调用会阻塞当前线程,因此订阅端应独立运行或放在单独的线程中。
代码示例
使用Python Redis客户端redis-py连接Redis实例的方法,请参考Redis-py客户端连接Redis。
消息发布者
def get_redis_connection():
return redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
def monitor_agent():
r = get_redis_connection()
# 模拟一轮采集产生的告警事件
alerts = [
("redis:monitor:notice", {
"instance": "dcs-prod-01",
"metric": "connected_clients",
"value": 128,
"threshold": 500,
"msg": "当前连接数 128,运行正常"
}),
("redis:monitor:warning", {
"instance": "dcs-prod-02",
"metric": "used_memory_mb",
"value": 7680,
"threshold": 8192,
"pct": "93.7%",
"msg": "内存使用率已达 93.7%,即将触及 maxmemory"
}),
("redis:monitor:warning", {
"instance": "dcs-prod-01",
"metric": "slowlog_count",
"value": 15,
"threshold": 10,
"window": "近5分钟",
"msg": "慢查询数量 15 条,超过阈值 10"
}),
("redis:monitor:critical", {
"instance": "dcs-prod-01",
"metric": "replication_status",
"value": "down",
"slave": "dcs-slave-01",
"msg": "主从复制连接中断,slave dcs-slave-01 已断开"
}),
("redis:monitor:notice", {
"instance": "dcs-prod-03",
"metric": "evicted_keys",
"value": 320,
"msg": "近1小时淘汰 key 320 个,已触发 maxmemory-policy"
}),
("redis:monitor:critical", {
"instance": "dcs-prod-02",
"metric": "used_memory_mb",
"value": 8190,
"threshold": 8192,
"pct": "99.97%",
"msg": "内存已满,写入命令被拒绝"
}),
("redis:monitor:warning", {
"instance": "dcs-prod-03",
"metric": "connected_clients",
"value": 460,
"threshold": 500,
"pct": "92.0%",
"msg": "连接数已达上限的 92%"
}),
]
for channel, data in alerts:
payload = json.dumps(data, ensure_ascii=False)
r.publish(channel, payload)
level = channel.split(":")[-1].upper()
six.print_("[监控Agent] {level:8s} | {instance} | {msg}".format(
level=level, instance=data['instance'], msg=data['msg']))
time.sleep(1)
six.print_("[监控Agent] 本轮采集告警推送完毕") 消息订阅者
- DBA 值班终端订阅者。
def dba_on_call_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.subscribe("redis:monitor:critical") six.print_("[DBA值班] 已订阅 critical 频道") for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) six.print_("[DBA值班] !!! 严重告警 !!! {instance} | {msg}".format( instance=data['instance'], msg=data['msg'])) - 运维看板订阅者。
def operations_dashboard_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.subscribe("redis:monitor:critical", "redis:monitor:warning") six.print_("[运维看板] 已订阅 critical + warning 频道") for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) level = message["channel"].split(":")[-1].upper() six.print_("[运维看板] [{level}] {instance} | {metric} | {msg}".format( level=level, instance=data['instance'], metric=data['metric'], msg=data['msg'])) - 自动修复服务订阅者。
def auto_repair_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.subscribe("redis:monitor:critical") six.print_("[自愈服务] 已订阅 critical 频道") for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) if data["metric"] == "used_memory_mb": print(f"[自愈服务] 检测到内存满 -> 触发自动清理: {data['instance']}") elif data["metric"] == "replication_status": print(f"[自愈服务] 检测到复制中断 -> 尝试重连 slave: {data.get('slave', 'unknown')}") else: print(f"[自愈服务] 收到告警: {data['instance']} | {data['msg']}") - 日志归档服务订阅者。
def log_archiver_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.psubscribe("redis:monitor:*") six.print_("[日志归档] 已模式订阅 redis:monitor:*") for message in pubsub.listen(): if message["type"] == "pmessage": data = json.loads(message["data"]) level = message["channel"].split(":")[-1].upper() six.print_("[日志归档] [{level:8s}] {instance} | {msg}".format( level=level, instance=data['instance'], msg=data['msg'])) - 主函数示例。
def main(): # 创建订阅者线程 dba_thread = threading.Thread(target=dba_on_call_subscriber) dashboard_thread = threading.Thread(target=operations_dashboard_subscriber) log_archiver_thread = threading.Thread(target=log_archiver_subscriber) auto_repair_thread = threading.Thread(target=auto_repair_subscriber) thread_list = [dba_thread, dashboard_thread, log_archiver_thread, auto_repair_thread] for sub_thread in thread_list: sub_thread.daemon = True # 启动所有订阅者线程 for sub_thread in thread_list: sub_thread.start() # 启动监控 Agent monitor_agent() # 等待所有线程结束 for sub_thread in thread_list: sub_thread.join(timeout=1)
运行结果
[DBA值班] 已订阅 critical 频道 [运维看板] 已订阅 critical + warning 频道 [日志归档] 已模式订阅 redis:monitor:* [自愈服务] 已订阅 critical 频道 [监控Agent] NOTICE | dcs-prod-01 | 当前连接数 128,运行正常 [日志归档] [NOTICE ] dcs-prod-01 | 当前连接数 128,运行正常 [监控Agent] WARNING | dcs-prod-02 | 内存使用率已达 93.7%,即将触及 maxmemory [运维看板] [WARNING] dcs-prod-02 | used_memory_mb | 内存使用率已达 93.7%,即将触及 maxmemory [日志归档] [WARNING ] dcs-prod-02 | 内存使用率已达 93.7%,即将触及 maxmemory [监控Agent] WARNING | dcs-prod-01 | 慢查询数量 15 条,超过阈值 10 [运维看板] [WARNING] dcs-prod-01 | slowlog_count | 慢查询数量 15 条,超过阈值 10 [日志归档] [WARNING ] dcs-prod-01 | 慢查询数量 15 条,超过阈值 10 [监控Agent] CRITICAL | dcs-prod-01 | 主从复制连接中断,slave dcs-slave-01 已断开 [日志归档] [CRITICAL] dcs-prod-01 | 主从复制连接中断,slave dcs-slave-01 已断开 [运维看板] [CRITICAL] dcs-prod-01 | replication_status | 主从复制连接中断,slave dcs-slave-01 已断开 [自愈服务] 检测到复制中断 -> 尝试重连 slave: dcs-slave-01 [DBA值班] !!! 严重告警 !!! dcs-prod-01 | 主从复制连接中断,slave dcs-slave-01 已断开 [监控Agent] NOTICE | dcs-prod-03 | 近1小时淘汰 key 320 个,已触发 maxmemory-policy [日志归档] [NOTICE ] dcs-prod-03 | 近1小时淘汰 key 320 个,已触发 maxmemory-policy [监控Agent] CRITICAL | dcs-prod-02 | 内存已满,写入命令被拒绝 [日志归档] [CRITICAL] dcs-prod-02 | 内存已满,写入命令被拒绝 [自愈服务] 检测到内存满 -> 触发自动清理: dcs-prod-02 [运维看板] [CRITICAL] dcs-prod-02 | used_memory_mb | 内存已满,写入命令被拒绝 [DBA值班] !!! 严重告警 !!! dcs-prod-02 | 内存已满,写入命令被拒绝 [监控Agent] WARNING | dcs-prod-03 | 连接数已达上限的 92% [日志归档] [WARNING ] dcs-prod-03 | 连接数已达上限的 92% [运维看板] [WARNING] dcs-prod-03 | connected_clients | 连接数已达上限的 92% [监控Agent] 本轮采集告警推送完毕
以上示例演示了一个监控 Agent 与四个不同角色订阅者的协作模式。通过频道分级、多频道订阅和模式匹配,可以灵活实现告警的精准路由和自动化响应。Redis Pub/Sub 适合监控告警、事件广播、状态同步等实时场景,但由于消息不持久化、无确认机制,不适合作为可靠的消息队列使用。如需消息可靠性保障,应考虑 Redis Stream 或专业消息队列中间件。