华为云数据库GeminiDB Redis 100%兼容开源Redis的消息发布(Publish)与订阅(Subscribe)功能,可参考此文档对自身业务进行适配。
使用场景
Pub/Sub模型在许多场景都可以发挥重要作用,以下罗列了一些常见的使用场景:
在即时通讯应用中,用户之间的消息需要快速传递。通过Pub/Sub功能,用户可以订阅各自的聊天频道,发布消息后,所有订阅该频道的用户立即收到消息。这种方式可以确保消息的实时性和高效性。
在电商网站或社交媒体平台中,用户需要实时接收订单状态更新、评论、点赞等通知。利用 GeminiDB Redis的Pub/Sub能力,系统可以在状态变更时立即发布通知,所有相关用户都会及时收到更新。
在微服务架构中,服务之间的状态监控和日志收集可以使用Pub/Sub模型。服务可以将状态信息或日志消息发布到特定频道,监控服务或日志收集服务可以订阅这些频道,从而实现实时监控和数据收集。
在多人在线游戏中,玩家之间的互动需要快速同步。Pub/Sub可以用于玩家之间的消息传递、游戏事件的通知等,确保所有玩家在同一时间收到游戏状态更新。
在数据流应用中,实时数据处理和分析是关键。通过Pub/Sub,数据生产者可以发布数据流,消费者可以订阅这些流以进行实时处理和分析。
基本用法
例如,要订阅频道"channel11"和"ch:00",客户端可以使用以下命令:
SUBSCRIBE channel11 ch:00
已订阅的客户端将收到其他客户端发送到这些频道的消息,并按照消息的发送顺序进行接收。
高级用法
Pub/Sub实现支持模式匹配。客户端可以订阅名称符合特定模式的频道,特定模式通过通配符实现,例如:
PSUBSCRIBE news.*
订阅者将会收到所有发送到news.art.figurative, news.music.jazz等等频道的消息。
- 消息丢失:Pub/Sub 机制不保证消息的持久性,因此在网络故障或订阅者未连接时,可能会丢失消息。
- 性能考虑:在高并发环境下,发布/订阅的性能可能受到限制。需要根据具体场景进行性能测试和优化。
- 如果同时使用SUBSCRIBE和PSUBSCRIBE,可能收到重复消息,请仔细确认是否符合业务逻辑。
Java示例代码(Jedis版)
消息发布者
import redis.clients.jedis.Jedis;
public class GeminiDBPubClient {
private Jedis jedis;
public GeminiDBPubClient(String ip, int port, String password){
jedis = new Jedis(ip, port);
// The instance password for GeminiDB.
String authString = jedis.auth(password);
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel, String message){
System.out.println(" >>> Publish > Channel: " + channel + " > Sent Message: " + message);
jedis.publish(channel, message);
}
public void close(String channel){
System.out.println(" >>> Publish End > Channel:" + channel + " > Message:quit");
// The message publisher has finished sending, sending a "quit" message.
jedis.publish(channel, "quit");
}
}
消息订阅者
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class GeminiDBSubClient extends Thread {
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public GeminiDBSubClient(String ip, int port, String password){
jedis = new Jedis(host,port);
// The instance password for GeminiDB.
String authString = jedis.auth(password); //password
if (!authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener, String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
System.err.println("Error:SubClient> listener or channel is null");
}
System.out.println(" >>> Subscribe > Channel:" + channel);
// The receiver will block the process while listening for subscribed messages until it receives a "quit" message (passive mode) or actively cancels the subscription.
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
System.out.println(" >>> Unsubscribe > Channel:" + channel);
listener.unsubscribe(channel);
}
@Override
public void run(){
try {
System.out.println("----------Subscribe Start-------");
subscribe();
System.out.println("----------Subscribe End-------");
} catch(Exception e){
e.printStackTrace();
}
}
}
消息监听者
import redis.clients.jedis.JedisPubSub;
public class GeminiDBListener extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println(" <<< Subscribe < Channel:" + channel + " > Receive Message:" + message );
// When the received message is "quit," unsubscribe (passive mode).
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
}