文档首页/ 云数据库 GeminiDB/ GeminiDB Redis接口/ 开发参考/ GeminiDB Redis 消息发布与订阅
更新时间:2024-12-02 GMT+08:00

GeminiDB Redis 消息发布与订阅

华为云数据库GeminiDB Redis 100%兼容开源Redis的消息发布(Publish)与订阅(Subscribe)功能,可参考此文档对自身业务进行适配。

Pub/Sub介绍

SUBSCRIBEUNSUBSCRIBEPUBLISH 实现了发布/订阅消息传递范式。在该范式中,消息发送者(发布者)并不直接将消息发送给特定的接收者(订阅者),而是将消息发布到一个主题(channel)上。所有对该主题感兴趣的订阅者都能够接收到消息。这种模式的优势在于它的解耦性,发布者和订阅者之间不需要相互了解,降低了系统的耦合度。

使用场景

Pub/Sub模型在许多场景都可以发挥重要作用,以下罗列了一些常见的使用场景:

  • 实时聊天应用

在即时通讯应用中,用户之间的消息需要快速传递。通过Pub/Sub功能,用户可以订阅各自的聊天频道,发布消息后,所有订阅该频道的用户立即收到消息。这种方式可以确保消息的实时性和高效性。

  • 实时通知系统

在电商网站或社交媒体平台中,用户需要实时接收订单状态更新、评论、点赞等通知。利用 GeminiDB Redis的Pub/Sub能力,系统可以在状态变更时立即发布通知,所有相关用户都会及时收到更新。

  • 监控和日志系统

在微服务架构中,服务之间的状态监控和日志收集可以使用Pub/Sub模型。服务可以将状态信息或日志消息发布到特定频道,监控服务或日志收集服务可以订阅这些频道,从而实现实时监控和数据收集。

  • 游戏实时消息传递

在多人在线游戏中,玩家之间的互动需要快速同步。Pub/Sub可以用于玩家之间的消息传递、游戏事件的通知等,确保所有玩家在同一时间收到游戏状态更新。

  • 数据流处理

在数据流应用中,实时数据处理和分析是关键。通过Pub/Sub,数据生产者可以发布数据流,消费者可以订阅这些流以进行实时处理和分析。

基本用法

例如,要订阅频道"channel11"和"ch:00",客户端可以使用以下命令:

SUBSCRIBE channel11 ch:00

已订阅的客户端将收到其他客户端发送到这些频道的消息,并按照消息的发送顺序进行接收。

高级用法

Pub/Sub实现支持模式匹配。客户端可以订阅名称符合特定模式的频道,特定模式通过通配符实现,例如:

PSUBSCRIBE news.*

订阅者将会收到所有发送到news.art.figurative, news.music.jazz等等频道的消息。

  • 消息丢失:Pub/Sub 机制不保证消息的持久性,因此在网络故障或订阅者未连接时,可能会丢失消息。
  • 性能考虑:在高并发环境下,发布/订阅的性能可能受到限制。需要根据具体场景进行性能测试和优化。
  • 如果同时使用SUBSCRIBE和PSUBSCRIBE,可能收到重复消息,请仔细确认是否符合业务逻辑。

Java示例代码(Jedis版)

消息发布者

import redis.clients.jedis.Jedis;
public class GeminiDBPubClient {
    private Jedis jedis;
 
    public GeminiDBPubClient(String ip, int port, String password){
        jedis = new Jedis(ip, port);
        // The instance password for GeminiDB.
        String authString = jedis.auth(password);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
 
    public void pub(String channel, String message){
        System.out.println("  >>> Publish > Channel: " + channel + " > Sent Message: " + message);
        jedis.publish(channel, message);
    }
 
    public void close(String channel){
        System.out.println("  >>> Publish End > Channel:" + channel + " > Message:quit");
        // The message publisher has finished sending, sending a "quit" message.
        jedis.publish(channel, "quit");
    }
}

消息订阅者

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class GeminiDBSubClient extends Thread {
    private Jedis jedis;
 
    private String channel;
    
    private JedisPubSub listener;
    
    public GeminiDBSubClient(String ip, int port, String password){
        jedis = new Jedis(host,port);
        // The instance password for GeminiDB.
        String authString = jedis.auth(password); //password
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    
    public void setChannelAndListener(JedisPubSub listener, String channel){
        this.listener=listener;
        this.channel=channel;
    }
 
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> Subscribe > Channel:" + channel);
        // The receiver will block the process while listening for subscribed messages until it receives a "quit" message (passive mode) or actively cancels the subscription.
        jedis.subscribe(listener, channel);
    }
 
    public void unsubscribe(String channel){
        System.out.println("  >>> Unsubscribe > Channel:" + channel);
        listener.unsubscribe(channel);
    }
 
    @Override
    public void run(){
        try {
            System.out.println("----------Subscribe Start-------");
            subscribe();
            System.out.println("----------Subscribe End-------");
        } catch(Exception e){
            e.printStackTrace();
        }
    }
}

消息监听者

import redis.clients.jedis.JedisPubSub;
public class GeminiDBListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< Subscribe < Channel:" + channel + " > Receive Message:" + message );
        // When the received message is "quit," unsubscribe (passive mode).
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}