文档首页 > > 开发指南> 消息集成开发指导> 使用RESTful API连接MQS>

Java Demo使用说明

Java Demo使用说明

分享
更新时间:2021/04/08 GMT+08:00

除了前面章节介绍的使用原生Kafka客户端,MQS(Kafka)实例还可以通过HTTP RESTful方式访问,包括向指定Topic发送消息、消费消息以及确认消费。

这种方式主要用于适配原有业务系统架构,方便统一使用HTTP协议接入。

如何使用

  1. 收集连接信息

    包括MQS连接地址与端口、Topic名称、SASL用户名与密码。具体请参考收集连接信息

    • 若ROMA Connect实例的消息集成在开启SASL_SSL的同时,也开启了VPC内网明文访问,则VPC内无法使用SASL方式连接消息集成的Topic。
    • 使用SASL方式连接消息集成的Topic时,建议在客户端所在主机的“/etc/hosts”文件中配置host和IP的映射关系,否则会引入时延。

      其中,IP地址必须为消息集成的连接地址,host为每个实例主机的名称,可以自定义,但不能重复。例如:

      10.10.10.11 host01

      10.10.10.12 host02

      10.10.10.13 host03

  2. 参考示例代码,组装API请求,包括对API请求的签名。

    对API请求签名,指使用SASL的用户名与密码作为密钥对,将请求URL、消息头时间戳等内容进行签名,供后端服务进行校验。点此了解签名流程

  3. 使用Demo向指定Topic生产消息、消费消息和确认消息时,返回的响应消息结构请参考生产消息接口说明消费消息接口说明消费确认接口说明

示例工程搭建

本指南提供了Java语言版本的RESTful API请求发送示例。

示例为一个在IntelliJ IDEA工具中开发的Maven工程,因此,您如果在本地环境使用,请先安装并配置以下环境(以Windows 10系统为例):

  • Maven:

    Apache Maven 3.0.3及以上版本,可至Maven官方下载页面下载。

  • JDK:

    Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。

    安装后注意配置JAVA的环境变量。

  • IntelliJ IDEA工具:

    IntelliJ IDEA 2018.3.5及以上版本,可至IntelliJ IDEA官方网站下载。

  • Demo:

    在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上角的“用户指南 > 下载RESTful API Java Demo包”下载Demo

  1. 打开IntelliJ IDEA,在菜单栏选择“Import Project”。

    弹出“Select File or Directory to Import”对话框。

  2. 在弹出的对话框中选择解压后的RESTful API Java Demo路径,单击“OK”。

  3. “Import project from external model”选择“Eclipse”,单击“Next”,进入下一页后保持默认连续单击“Next”,直到“Please select project SDK”页面。

    图1 Import Project

  4. 单击“Finish”,完成工程导入。

    图2 Finish

  5. 编辑rest-config.properties

    文件在src/main/resources目录下。将获取到的Kafka实例连接地址、Topic名称,以及SASL信息填写到下述配置中。其中参数kafka.rest.group为消费组ID,可在客户端指定。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    # Kafka rest endpoint.
    kafka.rest.endpoint=https://{MQS_Instance_IP_Addr}:9292
    # Kafka topic name.
    kafka.rest.topic=topic_name_demo
    # Kafka consume group.
    kafka.rest.group=group_id_demo
    # Kafka sasl username.
    kafka.rest.username=sasl_username_demo
    # Kafka sasl password.
    kafka.rest.password=sasl_user_passwd_demo 
    

  6. 编辑log4j.properties

    修改日志存储目录:

    1
    log.directory=D://workspace/logs
    

  7. 运行示例工程,查看消息生产与消费样例。

    消息生成与消费的Main方法在RestMain.java中,以Java Application的方式运行即可。

示例代码解读

  • 工程入口:

    工程入口在RestMain.java文件中。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class RestMain
    {
        private static final Logger LOGGER = LoggerFactory.getLogger(RestMain.class);
    
        public static void main(String[] args) throws InterruptedException
        {
            //初始化请求对象。在RestServiceImpl类文件中还包含RESTful API组装,以及对请求签名
            IRestService restService = new RestServiceImpl();
            Base64.Decoder decoder = Base64.getDecoder();
            //以下分别为生产消息、消费消息与消费确认
            // Produce message
            ProduceReq messages = new ProduceReq();
            messages.addMessage("{[{'id': '00001', 'name': 'John'}, {'id': '00002', 'name': 'Mike'}]}").addMessage("Kafka rest client demo!");
            LOGGER.debug("produce message: {}", JsonUtils.convertObject2Str(messages));
            restService.produce(messages);
    
            // Consume message
            List<ConsumeResp> consumeResps = restService.consume();
            CommitReq commitReq = new CommitReq();
            consumeResps.forEach(resp ->
            {
                LOGGER.debug("handler: {}, content: {}", resp.getHandler(), new String(decoder.decode(resp.getMessage().getContent())));
                commitReq.addCommit(resp.getHandler());
            });
    
            // Commit message
            if (commitReq.getMessages().size() != 0)
            {
                CommitResp resp = restService.commit(commitReq);
                LOGGER.info("Commit resp: success: {}, failed: {}", resp.getSuccess(), resp.getFail());
            }
            else
            {
                LOGGER.warn("Commit is empty.");
            }
        }
    }
    
  • 消息组装与发送:

    以生产消息为例,在下述方法中完成消息组装和签名。其中签名方法调用后,会返回两个消息头:Authorization和X-Sdk-Date,Authorization包含了对请求内容的签名信息。消息头的另一个参数Content-Type需要在代码中添加,参考示例的createRequest()方法。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
     public List<ProduceResp> produce(ProduceReq messages)
        {
            List<ProduceResp> prodResp = null;
            try
            {
                Request request = createRequest();
                request.setUrl(produceURI);
                request.setMethod("POST");
                request.setBody(JsonUtils.convertObject2Str(messages));
                //对请求内容签名,签名后,请求头部参数会新增两个参数:Authorization和X-Sdk-Date,Authorization包含了对请求内容的签名信息。
                HttpRequestBase signedRequest = Client.sign(request);
                LOGGER.debug("Request uri: {}, headers: {}", signedRequest.getURI(), signedRequest.getAllHeaders());
                LOGGER.debug("Request body: {}", request.getBody());
    
                HttpResponse response = HttpUtils.execute(signedRequest);
                if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED)
                {
                    String jsonStr = EntityUtils.toString(response.getEntity(), "UTF-8");
                    prodResp = JsonUtils.convertStr2ListObject(jsonStr, new TypeReference<List<ProduceResp>>() { });
                    LOGGER.info("Produce response: {}", jsonStr);
                    return prodResp;
                }
                else
                {
                    LOGGER.error("Produce message failed. statusCode: {}, error msg: {}",
                            response.getStatusLine().getStatusCode(),
                            EntityUtils.toString(response.getEntity(), "UTF-8"));
                }
            }
            catch (Exception e)
            {
                LOGGER.error("Produce message failed.");
            }
            return prodResp;
        }
    
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!非常感谢您的反馈,我们会继续努力做到更好!
反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区论坛频道来与我们联系探讨

智能客服提问云社区提问