Using DCS to Publish and Subscribe to Messages
Scenarios
Generally, multiple DCS instances (such as dcs-prod-01, dcs-prod-02, and dcs-prod-03) are usually deployed in a production environment. The O&M team needs to monitor the running status of each instance in real time, including the memory usage, number of connections, slow queries, master/replica replication status, and key eviction. Once an exception occurs, different roles of subscribers need to receive alarms by severity. The on-duty DBA focuses only on critical issues. The O&M dashboard displays alarm trends. The self-healing service triggers self-healing operations for specific faults. The log archiving service records alarms of all severities for post-event audit.
The Redis Pub/Sub mechanism effectively addresses this issue. The monitoring agent periodically collects the performance metrics from each instance. Upon detecting an exception, the agent publishes alarms to severity-specific channels (such as redis:monitor:critical, redis:monitor:warning, and redis:monitor:notice). Subscribers can then receive these alarms in real time as needed. The log archiving service can also subscribe to all redis:monitor:* channels simultaneously via pattern matching (PSUBSCRIBE).
Due to the fire-and-forget messaging model of Redis Pub/Sub, messages are ephemeral and not persisted. Because the subscriber's call to the listen() function blocks the current thread, the subscriber should run independently or within a separate thread.
Code Sample
For details about how to use the Python Redis client redis-py to connect to a Redis instance, see Using redis-py to Access an Instance.
Message Publisher
def get_redis_connection():
return redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
def monitor_agent():
r = get_redis_connection()
# Simulate alarms generated during a collection cycle.
alerts = [
("redis:monitor:notice", {
"instance": "dcs-prod-01",
"metric": "connected_clients",
"value": 128,
"threshold": 500,
"msg": "128 connections are currently active and running properly."
}),
("redis:monitor:warning", {
"instance": "dcs-prod-02",
"metric": "used_memory_mb",
"value": 7680,
"threshold": 8192,
"pct": "93.7%",
"msg": "The memory usage has reached 93.7% and is about to reach the maxmemory limit."
}),
("redis:monitor:warning", {
"instance": "dcs-prod-01",
"metric": "slowlog_count",
"value": 15,
"threshold": 10,
"window": "Last 5 minutes",
"msg": "The number of slow queries is 15, exceeding the threshold 10."
}),
("redis:monitor:critical", {
"instance": "dcs-prod-01",
"metric": "replication_status",
"value": "down",
"slave": "dcs-slave-01",
"msg": "The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected."
}),
("redis:monitor:notice", {
"instance": "dcs-prod-03",
"metric": "evicted_keys",
"value": 320,
"msg": "320 keys were evicted in the last hour, triggering maxmemory-policy."
}),
("redis:monitor:critical", {
"instance": "dcs-prod-02",
"metric": "used_memory_mb",
"value": 8190,
"threshold": 8192,
"pct": "99.97%",
"msg": "The memory is full, and write commands are rejected."
}),
("redis:monitor:warning", {
"instance": "dcs-prod-03",
"metric": "connected_clients",
"value": 460,
"threshold": 500,
"pct": "92.0%",
"msg": "The number of connections has reached 92% of the upper limit."
}),
]
for channel, data in alerts:
payload = json.dumps(data, ensure_ascii=False)
r.publish(channel, payload)
level = channel.split(":")[-1].upper()
six.print_("[Monitoring agent] {level:8s} | {instance} | {msg}".format(
level=level, instance=data['instance'], msg=data['msg']))
time.sleep(1)
six.print_("[Monitoring agent] All alarms in this collection cycle have been pushed.") Message Subscriber
- Subscriber for the on-duty DBA
def dba_on_call_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.subscribe("redis:monitor:critical") six.print_("[On-duty DBA] The critical channel has been subscribed to.") for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) six.print_("[On-duty DBA] !!! Critical alarm !!! {instance} | {msg}".format( instance=data['instance'], msg=data['msg'])) - Subscriber for the O&M dashboard
def operations_dashboard_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.subscribe("redis:monitor:critical", "redis:monitor:warning") six.print_("[O&M dashboard] The critical and warning channels have been subscribed to.") for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) level = message["channel"].split(":")[-1].upper() six.print_("[O&M dashboard] [{level}] {instance} | {metric} | {msg}".format( level=level, instance=data['instance'], metric=data['metric'], msg=data['msg'])) - Subscriber for the self-healing service
def auto_repair_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.subscribe("redis:monitor:critical") six.print_("[Self-healing service] The critical channel has been subscribed to.") for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) if data["metric"] == "used_memory_mb": print(f"[Self-healing service] Full memory detected -> Automatic cleanup triggered: {data['instance']}") elif data["metric"] == "replication_status": print(f"[Self-healing service] Replication interruption detected -> Attempting to reconnect to the replica: {data.get('slave', 'unknown')})") else: print(f"[Self-healing service] Alarms received: {data['instance']} | {data['msg']}") - Subscriber for the log archiving service
def log_archiver_subscriber(): r = get_redis_connection() pubsub = r.pubsub() pubsub.psubscribe("redis:monitor:*") six.print_("[Log archiving] The redis:monitor:* channel has been subscribed to via pattern matching.") for message in pubsub.listen(): if message["type"] == "pmessage": data = json.loads(message["data"]) level = message["channel"].split(":")[-1].upper() six.print_("[Log archiving] [{level:8s}] {instance} | {msg}".format( level=level, instance=data['instance'], msg=data['msg'])) - Example of the main function
def main(): # Create subscriber threads. dba_thread = threading.Thread(target=dba_on_call_subscriber) dashboard_thread = threading.Thread(target=operations_dashboard_subscriber) log_archiver_thread = threading.Thread(target=log_archiver_subscriber) auto_repair_thread = threading.Thread(target=auto_repair_subscriber) thread_list = [dba_thread, dashboard_thread, log_archiver_thread, auto_repair_thread] for sub_thread in thread_list: sub_thread.daemon = True # Start all subscriber threads. for sub_thread in thread_list: sub_thread.start() # Start the monitoring agent. monitor_agent() # Wait until all threads are complete. for sub_thread in thread_list: sub_thread.join(timeout=1)
Execution Result
[On-duty DBA] The critical channel has been subscribed to. [O&M dashboard] The critical and warning channels have been subscribed to. [Log archiving] The redis:monitor:* channel has been subscribed to via pattern matching. [Self-healing service] The critical channel has been subscribed to. [Monitoring agent] NOTICE | dcs-prod-01 | 128 connections are currently active and running properly. [Log archiving] [NOTICE ] dcs-prod-01 | 128 connections are currently active and running properly. [Monitoring agent] WARNING | dcs-prod-02 | The memory usage has reached 93.7% and is about to reach the maxmemory limit. [O&M dashboard] [WARNING] dcs-prod-02 | used_memory_mb | The memory usage has reached 93.7% and is about to reach the maxmemory limit. [Log archiving] [WARNING ] dcs-prod-02 | The memory usage has reached 93.7% and is about to reach the maxmemory limit. [Monitoring agent] WARNING | dcs-prod-01 | The number of slow queries is 15, exceeding the threshold 10. [O&M dashboard] [WARNING] dcs-prod-01 | slowlog_count | The number of slow queries is 15, exceeding the threshold 10. [Log archiving] [WARNING ] dcs-prod-01 | The number of slow queries is 15, exceeding the threshold 10. [Monitoring agent] CRITICAL | dcs-prod-01 | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected. [Log archiving] [CRITICAL] dcs-prod-01 | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected. [O&M dashboard] [CRITICAL] dcs-prod-01 | replication_status | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected. [Self-healing service] Replication interruption detected -> Attempting to reconnect to the replica: dcs-slave-01 [On-duty DBA] !!! Critical alarm !!! dcs-prod-01 | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected. [Monitoring agent] NOTICE | dcs-prod-03 | 320 keys were evicted in the last hour, and maxmemory-policy has been triggered. [Log archiving] [NOTICE ] dcs-prod-03 | 320 keys were evicted in the last hour, and maxmemory-policy has been triggered. [Monitoring agent] CRITICAL | dcs-prod-02 | The memory is full, and write commands are rejected. [Log archiving] [CRITICAL] dcs-prod-02 | The memory is full, and write commands are rejected. [Self-healing service] Full memory detected -> Automatic cleanup triggered: dcs-prod-02 [O&M dashboard] [CRITICAL] dcs-prod-02 | used_memory_mb | The memory is full, and write commands are rejected. [On-duty DBA] !!! Critical alarm !!! dcs-prod-02 | The memory is full, and write commands are rejected. [Monitoring agent] WARNING | dcs-prod-03 | The number of connections has reached 92% of the upper limit. [Log archiving] [WARNING] dcs-prod-03 | The number of connections has reached 92% of the upper limit. [O&M dashboard] [WARNING] dcs-prod-03 | connected_clients | The number of connections has reached 92% of the upper limit. [Monitoring agent] All alarms in this collection cycle have been pushed.
The code sample above illustrates the collaboration between a monitoring agent and four distinct subscriber roles. Hierarchical channeling, multi-channel subscription, and pattern matching enable flexible, precise alarm routing and automated response. Redis Pub/Sub is suitable for real-time scenarios such as monitoring and alerting, event broadcasting, and status synchronization. However, it cannot be used as a reliable message queue due to its lack of message persistence and acknowledgment mechanisms. To guarantee message reliability, consider using Redis Streams or dedicated message queue middleware.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.