GeminiDB Redis 消息发布与订阅
华为云数据库GeminiDB Redis 100%兼容开源Redis的消息发布(Publish)与订阅(Subscribe)功能,可参考此文档对自身业务进行适配。
Pub/Sub介绍
SUBSCRIBE、UNSUBSCRIBE 和 PUBLISH 实现了发布/订阅消息传递范式。在该范式中,消息发送者(发布者)并不直接将消息发送给特定的接收者(订阅者),而是将消息发布到一个主题(channel)上。所有对该主题感兴趣的订阅者都能够接收到消息。这种模式的优势在于它的解耦性,发布者和订阅者之间不需要相互了解,降低了系统的耦合度。
使用场景
Pub/Sub模型在许多场景都可以发挥重要作用,以下罗列了一些常见的使用场景:
- 实时聊天应用
在即时通讯应用中,用户之间的消息需要快速传递。通过Pub/Sub功能,用户可以订阅各自的聊天频道,发布消息后,所有订阅该频道的用户立即收到消息。这种方式可以确保消息的实时性和高效性。
- 实时通知系统
在电商网站或社交媒体平台中,用户需要实时接收订单状态更新、评论、点赞等通知。利用 GeminiDB Redis的Pub/Sub能力,系统可以在状态变更时立即发布通知,所有相关用户都会及时收到更新。
- 监控和日志系统
在微服务架构中,服务之间的状态监控和日志收集可以使用Pub/Sub模型。服务可以将状态信息或日志消息发布到特定频道,监控服务或日志收集服务可以订阅这些频道,从而实现实时监控和数据收集。
- 游戏实时消息传递
在多人在线游戏中,玩家之间的互动需要快速同步。Pub/Sub可以用于玩家之间的消息传递、游戏事件的通知等,确保所有玩家在同一时间收到游戏状态更新。
- 数据流处理
在数据流应用中,实时数据处理和分析是关键。通过Pub/Sub,数据生产者可以发布数据流,消费者可以订阅这些流以进行实时处理和分析。
基本用法
例如,要订阅频道"channel11"和"ch:00",客户端可以使用以下命令:
SUBSCRIBE channel11 ch:00
已订阅的客户端将收到其他客户端发送到这些频道的消息,并按照消息的发送顺序进行接收。
高级用法
Pub/Sub实现支持模式匹配。客户端可以订阅名称符合特定模式的频道,特定模式通过通配符实现,例如:
PSUBSCRIBE news.*
订阅者将会收到所有发送到news.art.figurative, news.music.jazz等等频道的消息。
- 消息丢失:Pub/Sub 机制不保证消息的持久性,因此在网络故障或订阅者未连接时,可能会丢失消息。
- 性能考虑:在高并发环境下,发布/订阅的性能可能受到限制。需要根据具体场景进行性能测试和优化。
- 如果同时使用SUBSCRIBE和PSUBSCRIBE,可能收到重复消息,请仔细确认是否符合业务逻辑。
Java示例代码(Jedis版)
消息发布者
import redis.clients.jedis.Jedis; public class GeminiDBPubClient { private Jedis jedis; public GeminiDBPubClient(String ip, int port, String password){ jedis = new Jedis(ip, port); // The instance password for GeminiDB. String authString = jedis.auth(password); if (!authString.equals("OK")) { System.err.println("AUTH Failed: " + authString); return; } } public void pub(String channel, String message){ System.out.println(" >>> Publish > Channel: " + channel + " > Sent Message: " + message); jedis.publish(channel, message); } public void close(String channel){ System.out.println(" >>> Publish End > Channel:" + channel + " > Message:quit"); // The message publisher has finished sending, sending a "quit" message. jedis.publish(channel, "quit"); } }
消息订阅者
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class GeminiDBSubClient extends Thread { private Jedis jedis; private String channel; private JedisPubSub listener; public GeminiDBSubClient(String ip, int port, String password){ jedis = new Jedis(host,port); // The instance password for GeminiDB. String authString = jedis.auth(password); //password if (!authString.equals("OK")) { System.err.println("AUTH Failed: " + authString); return; } } public void setChannelAndListener(JedisPubSub listener, String channel){ this.listener=listener; this.channel=channel; } private void subscribe(){ if(listener==null || channel==null){ System.err.println("Error:SubClient> listener or channel is null"); } System.out.println(" >>> Subscribe > Channel:" + channel); // The receiver will block the process while listening for subscribed messages until it receives a "quit" message (passive mode) or actively cancels the subscription. jedis.subscribe(listener, channel); } public void unsubscribe(String channel){ System.out.println(" >>> Unsubscribe > Channel:" + channel); listener.unsubscribe(channel); } @Override public void run(){ try { System.out.println("----------Subscribe Start-------"); subscribe(); System.out.println("----------Subscribe End-------"); } catch(Exception e){ e.printStackTrace(); } } }
消息监听者
import redis.clients.jedis.JedisPubSub; public class GeminiDBListener extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println(" <<< Subscribe < Channel:" + channel + " > Receive Message:" + message ); // When the received message is "quit," unsubscribe (passive mode). if(message.equalsIgnoreCase("quit")){ this.unsubscribe(channel); } } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub } @Override public void onSubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onUnsubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } }