使用客户端连接RabbitMQ(关闭SSL)
本章节以分布式消息服务RabbitMQ版提供的demo为例,介绍RabbitMQ客户端如何连接未开启SSL的RabbitMQ实例,并生产和消费消息。
前提条件
- 已购买RabbitMQ实例,并记录创建时输入的用户名和密码,实例未开启SSL。
- 在实例详情中查看并记录“内网连接地址/公网连接地址”。
- 客户端所在服务器和RabbitMQ实例之间网络已互通,具体网络要求参见连接RabbitMQ网络要求。
- 客户端所在服务器已安装Java Development Kit 1.8.111或以上版本,并配置JAVA_HOME与PATH环境变量,环境变量配置方法如下:
使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH
执行source .bash_profile命令使修改生效。
- RabbitMQ实例中已创建Vhost、Exchange和Queue,且配置Exchange和Queue的绑定。
命令行模式连接实例
- 登录客户端所在服务器。
- 下载RabbitMQ-Tutorial.zip示例工程代码。
wget https://dms-demo.obs.cn-north-1.myhuaweicloud.com/RabbitMQ-Tutorial.zip
- 解压RabbitMQ-Tutorial.zip压缩包。
unzip RabbitMQ-Tutorial.zip
- 进入RabbitMQ-Tutorial目录,该目录下包含预编译好的jar文件。
cd RabbitMQ-Tutorial
- 运行生产消息示例。
java -cp .:rabbitmq-tutorial.jar Send {host} {port} {user} {password}
参数说明如下:
生产消息示例如下:
[root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Send 192.168.xx.40 5672 test Zxxxxxxs [x] Sent 'Hello World!' [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Send 192.168.xx.40 5672 test Zxxxxxxs [x] Sent 'Hello World!'
- 运行消费消息示例。
java -cp .:rabbitmq-tutorial.jar Recv {host} {port} {user} {password}
参数说明如下:
消费消息示例如下:
[root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Recv 192.168.xx.40 5672 test Zxxxxxxs [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Received 'Hello World!'
如需停止消费使用Ctrl+C命令退出。
示例代码(Java)
- 连接实例并生产消息示例代码:
- VHOST_NAME:消息要发送的Queue所在的Vhost名称。
- QUEUE_NAME:消息要发送的Queue名称。
- Hello World!:要发送的消息,根据实际需要修改。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setVirtualHost("VHOST_NAME"); factory.setUsername(user); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close();
- 连接实例并消费消息示例代码:
- VHOST_NAME:要消费消息的Queue所在的Vhost名称。
- QUEUE_NAME:要消费消息的Queue名称。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setVirtualHost("VHOST_NAME"); factory.setUsername(user); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);