Help Center/ Distributed Cache Service/ Best Practices/ Service Application/ Using DCS to Publish and Subscribe to Messages
Updated on 2026-06-17 GMT+08:00

Using DCS to Publish and Subscribe to Messages

Scenarios

Generally, multiple DCS instances (such as dcs-prod-01, dcs-prod-02, and dcs-prod-03) are usually deployed in a production environment. The O&M team needs to monitor the running status of each instance in real time, including the memory usage, number of connections, slow queries, master/replica replication status, and key eviction. Once an exception occurs, different roles of subscribers need to receive alarms by severity. The on-duty DBA focuses only on critical issues. The O&M dashboard displays alarm trends. The self-healing service triggers self-healing operations for specific faults. The log archiving service records alarms of all severities for post-event audit.

The Redis Pub/Sub mechanism effectively addresses this issue. The monitoring agent periodically collects the performance metrics from each instance. Upon detecting an exception, the agent publishes alarms to severity-specific channels (such as redis:monitor:critical, redis:monitor:warning, and redis:monitor:notice). Subscribers can then receive these alarms in real time as needed. The log archiving service can also subscribe to all redis:monitor:* channels simultaneously via pattern matching (PSUBSCRIBE).

Due to the fire-and-forget messaging model of Redis Pub/Sub, messages are ephemeral and not persisted. Because the subscriber's call to the listen() function blocks the current thread, the subscriber should run independently or within a separate thread.

Code Sample

For details about how to use the Python Redis client redis-py to connect to a Redis instance, see Using redis-py to Access an Instance.

Message Publisher

def get_redis_connection():
    return redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
def monitor_agent():
    r = get_redis_connection()
    # Simulate alarms generated during a collection cycle.
    alerts = [
        ("redis:monitor:notice", {
            "instance": "dcs-prod-01",
            "metric": "connected_clients",
            "value": 128,
            "threshold": 500,
            "msg": "128 connections are currently active and running properly."
        }),
        ("redis:monitor:warning", {
            "instance": "dcs-prod-02",
            "metric": "used_memory_mb",
            "value": 7680,
            "threshold": 8192,
            "pct": "93.7%",
            "msg": "The memory usage has reached 93.7% and is about to reach the maxmemory limit."
        }),
        ("redis:monitor:warning", {
            "instance": "dcs-prod-01",
            "metric": "slowlog_count",
            "value": 15,
            "threshold": 10,
            "window": "Last 5 minutes",
            "msg": "The number of slow queries is 15, exceeding the threshold 10."
        }),
        ("redis:monitor:critical", {
            "instance": "dcs-prod-01",
            "metric": "replication_status",
            "value": "down",
            "slave": "dcs-slave-01",
            "msg": "The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected."
        }),
        ("redis:monitor:notice", {
            "instance": "dcs-prod-03",
            "metric": "evicted_keys",
            "value": 320,
            "msg": "320 keys were evicted in the last hour, triggering maxmemory-policy."
        }),
        ("redis:monitor:critical", {
            "instance": "dcs-prod-02",
            "metric": "used_memory_mb",
            "value": 8190,
            "threshold": 8192,
            "pct": "99.97%",
            "msg": "The memory is full, and write commands are rejected."
        }),
        ("redis:monitor:warning", {
            "instance": "dcs-prod-03",
            "metric": "connected_clients",
            "value": 460,
            "threshold": 500,
            "pct": "92.0%",
            "msg": "The number of connections has reached 92% of the upper limit."
        }),
    ]
    for channel, data in alerts:
        payload = json.dumps(data, ensure_ascii=False)
        r.publish(channel, payload)
        level = channel.split(":")[-1].upper()
        six.print_("[Monitoring agent] {level:8s} | {instance} | {msg}".format(
            level=level, instance=data['instance'], msg=data['msg']))
        time.sleep(1)
    six.print_("[Monitoring agent] All alarms in this collection cycle have been pushed.")

Message Subscriber

  1. Subscriber for the on-duty DBA
    def dba_on_call_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.subscribe("redis:monitor:critical")
        six.print_("[On-duty DBA] The critical channel has been subscribed to.")
        for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                six.print_("[On-duty DBA] !!! Critical alarm !!! {instance} | {msg}".format(
                    instance=data['instance'], msg=data['msg']))
  2. Subscriber for the O&M dashboard
    def operations_dashboard_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.subscribe("redis:monitor:critical", "redis:monitor:warning")
        six.print_("[O&M dashboard] The critical and warning channels have been subscribed to.")
        for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                level = message["channel"].split(":")[-1].upper()
                six.print_("[O&M dashboard] [{level}] {instance} | {metric} | {msg}".format(
                    level=level, instance=data['instance'],
                    metric=data['metric'], msg=data['msg']))
  3. Subscriber for the self-healing service
    def auto_repair_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.subscribe("redis:monitor:critical")
        six.print_("[Self-healing service] The critical channel has been subscribed to.")
        for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                if data["metric"] == "used_memory_mb":
                    print(f"[Self-healing service] Full memory detected -> Automatic cleanup triggered: {data['instance']}")
                elif data["metric"] == "replication_status":
                    print(f"[Self-healing service] Replication interruption detected -> Attempting to reconnect to the replica: {data.get('slave', 'unknown')})")
                else:
                    print(f"[Self-healing service] Alarms received: {data['instance']} | {data['msg']}")
  4. Subscriber for the log archiving service
    def log_archiver_subscriber():
        r = get_redis_connection()
        pubsub = r.pubsub()
        pubsub.psubscribe("redis:monitor:*")
        six.print_("[Log archiving] The redis:monitor:* channel has been subscribed to via pattern matching.")
        for message in pubsub.listen():
            if message["type"] == "pmessage":
                data = json.loads(message["data"])
                level = message["channel"].split(":")[-1].upper()
                six.print_("[Log archiving] [{level:8s}] {instance} | {msg}".format(
                    level=level, instance=data['instance'], msg=data['msg']))
  5. Example of the main function
    def main():
        # Create subscriber threads.
        dba_thread = threading.Thread(target=dba_on_call_subscriber)
        dashboard_thread = threading.Thread(target=operations_dashboard_subscriber)
        log_archiver_thread = threading.Thread(target=log_archiver_subscriber)
        auto_repair_thread = threading.Thread(target=auto_repair_subscriber)
    
        thread_list = [dba_thread, dashboard_thread, log_archiver_thread, auto_repair_thread]
    
        for sub_thread in thread_list:
            sub_thread.daemon = True
        # Start all subscriber threads.
        for sub_thread in thread_list:
            sub_thread.start()
        # Start the monitoring agent.
        monitor_agent()
        # Wait until all threads are complete.
        for sub_thread in thread_list:
            sub_thread.join(timeout=1)

Execution Result

[On-duty DBA] The critical channel has been subscribed to.
[O&M dashboard] The critical and warning channels have been subscribed to.
[Log archiving] The redis:monitor:* channel has been subscribed to via pattern matching.
[Self-healing service] The critical channel has been subscribed to.
[Monitoring agent] NOTICE   | dcs-prod-01 | 128 connections are currently active and running properly.
[Log archiving] [NOTICE  ] dcs-prod-01 | 128 connections are currently active and running properly.
[Monitoring agent] WARNING  | dcs-prod-02 | The memory usage has reached 93.7% and is about to reach the maxmemory limit.
[O&M dashboard] [WARNING] dcs-prod-02 | used_memory_mb | The memory usage has reached 93.7% and is about to reach the maxmemory limit.
[Log archiving] [WARNING ] dcs-prod-02 | The memory usage has reached 93.7% and is about to reach the maxmemory limit.
[Monitoring agent] WARNING | dcs-prod-01 | The number of slow queries is 15, exceeding the threshold 10.
[O&M dashboard] [WARNING] dcs-prod-01 | slowlog_count | The number of slow queries is 15, exceeding the threshold 10.
[Log archiving] [WARNING ] dcs-prod-01 | The number of slow queries is 15, exceeding the threshold 10.
[Monitoring agent] CRITICAL | dcs-prod-01 | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected.
[Log archiving] [CRITICAL] dcs-prod-01 | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected.
[O&M dashboard] [CRITICAL] dcs-prod-01 | replication_status | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected.
[Self-healing service] Replication interruption detected -> Attempting to reconnect to the replica: dcs-slave-01
[On-duty DBA] !!! Critical alarm !!! dcs-prod-01 | The master/replica replication connection is interrupted. The dcs-slave-01 node has been disconnected.
[Monitoring agent] NOTICE   | dcs-prod-03 | 320 keys were evicted in the last hour, and maxmemory-policy has been triggered.
[Log archiving] [NOTICE  ] dcs-prod-03 | 320 keys were evicted in the last hour, and maxmemory-policy has been triggered.
[Monitoring agent] CRITICAL | dcs-prod-02 | The memory is full, and write commands are rejected.
[Log archiving] [CRITICAL] dcs-prod-02 | The memory is full, and write commands are rejected.
[Self-healing service] Full memory detected -> Automatic cleanup triggered: dcs-prod-02
[O&M dashboard] [CRITICAL] dcs-prod-02 | used_memory_mb | The memory is full, and write commands are rejected.
[On-duty DBA] !!! Critical alarm !!! dcs-prod-02 | The memory is full, and write commands are rejected.
[Monitoring agent] WARNING | dcs-prod-03 | The number of connections has reached 92% of the upper limit.
[Log archiving] [WARNING] dcs-prod-03 | The number of connections has reached 92% of the upper limit.
[O&M dashboard] [WARNING] dcs-prod-03 | connected_clients | The number of connections has reached 92% of the upper limit.
[Monitoring agent] All alarms in this collection cycle have been pushed.

The code sample above illustrates the collaboration between a monitoring agent and four distinct subscriber roles. Hierarchical channeling, multi-channel subscription, and pattern matching enable flexible, precise alarm routing and automated response. Redis Pub/Sub is suitable for real-time scenarios such as monitoring and alerting, event broadcasting, and status synchronization. However, it cannot be used as a reliable message queue due to its lack of message persistence and acknowledgment mechanisms. To guarantee message reliability, consider using Redis Streams or dedicated message queue middleware.