更新时间:2024-10-29 GMT+08:00

使用客户端连接RabbitMQ(关闭SSL)

本章节以分布式消息服务RabbitMQ版提供的demo为例,介绍RabbitMQ客户端如何连接未开启SSL的RabbitMQ实例,并生产和消费消息。

前提条件

  • 购买RabbitMQ实例,并记录创建时输入的用户名和密码,实例未开启SSL。
  • 在实例详情中查看并记录“内网连接地址/公网连接地址”。
  • 客户端所在服务器和RabbitMQ实例之间网络已互通,具体网络要求参见连接RabbitMQ网络要求
  • 客户端所在服务器已安装Java Development Kit 1.8.111或以上版本,并配置JAVA_HOME与PATH环境变量,环境变量配置方法如下:

    使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。

    export JAVA_HOME=/opt/java/jdk1.8.0_151 
    export PATH=$JAVA_HOME/bin:$PATH

    执行source .bash_profile命令使修改生效。

  • RabbitMQ实例中已创建VhostExchangeQueue,且配置Exchange和Queue的绑定

命令行模式连接实例

  1. 登录客户端所在服务器。
  2. 下载RabbitMQ-Tutorial.zip示例工程代码。

    wget https://dms-demo.obs.cn-north-1.myhuaweicloud.com/RabbitMQ-Tutorial.zip

  3. 解压RabbitMQ-Tutorial.zip压缩包。

    unzip RabbitMQ-Tutorial.zip

  4. 进入RabbitMQ-Tutorial目录,该目录下包含预编译好的jar文件。

    cd RabbitMQ-Tutorial

  5. 运行生产消息示例。

    java -cp .:rabbitmq-tutorial.jar Send {host} {port} {user} {password}

    参数说明如下:

    • {host}:从前提条件中获取的连接地址。
    • {port}:RabbitMQ实例的连接端口,输入5672。
    • {user}:从前提条件中获取的用户名。
    • {password}:从前提条件中获取的密码。

    生产消息示例如下:

    [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Send 192.168.xx.40 5672 test Zxxxxxxs
     [x] Sent 'Hello World!'
    [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Send 192.168.xx.40 5672 test Zxxxxxxs
     [x] Sent 'Hello World!'

  6. 运行消费消息示例。

    java -cp .:rabbitmq-tutorial.jar Recv {host} {port} {user} {password}

    参数说明如下:

    • {host}:从前提条件中获取的连接地址。
    • {port}:RabbitMQ实例的连接端口,输入5672。
    • {user}:从前提条件中获取的用户名。
    • {password}:从前提条件中获取的密码。

    消费消息示例如下:

    [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Recv 192.168.xx.40 5672 test Zxxxxxxs
     [*] Waiting for messages. To exit press CTRL+C
     [x] Received 'Hello World!'
     [x] Received 'Hello World!'

    如需停止消费使用Ctrl+C命令退出。

示例代码(Java)

  • 连接实例并生产消息示例代码:
    • VHOST_NAME:消息要发送的Queue所在的Vhost名称。
    • QUEUE_NAME:消息要发送的Queue名称。
    • Hello World!:要发送的消息,根据实际需要修改。
    ConnectionFactory factory = new ConnectionFactory();
     factory.setHost(host);
     factory.setPort(port);
     factory.setVirtualHost("VHOST_NAME");
    
     factory.setUsername(user);
     factory.setPassword(password);
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();
    
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
     String message = "Hello World!";
     channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
     System.out.println(" [x] Sent '" + message + "'");
    
     channel.close();
     connection.close();
  • 连接实例并消费消息示例代码:
    • VHOST_NAME:要消费消息的Queue所在的Vhost名称。
    • QUEUE_NAME:要消费消息的Queue名称。
    ConnectionFactory factory = new ConnectionFactory();
     factory.setHost(host);
     factory.setPort(port);
     factory.setVirtualHost("VHOST_NAME");
     factory.setUsername(user);
     factory.setPassword(password);
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();
    
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
     Consumer consumer = new DefaultConsumer(channel)
     {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                 byte[] body)
                 throws IOException
         {
             String message = new String(body, "UTF-8");
             System.out.println(" [x] Received '" + message + "'");
         }
     };
     channel.basicConsume(QUEUE_NAME, true, consumer);