Java Demo使用说明
除了前面章节介绍的使用原生Kafka客户端,MQS(Kafka)实例还可以通过HTTP RESTful方式访问,包括向指定Topic发送消息、消费消息以及确认消费。
这种方式主要用于适配原有业务系统架构,方便统一使用HTTP协议接入。
如何使用
示例工程搭建
本指南提供了Java语言版本的RESTful API请求发送示例。
示例为一个在IntelliJ IDEA工具中开发的Maven工程,因此,您如果在本地环境使用,请先安装并配置以下环境(以Windows 10系统为例):
- Maven:
Apache Maven 3.0.3及以上版本,可至Maven官方下载页面下载。
- JDK:
Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。
安装后注意配置JAVA的环境变量。
- IntelliJ IDEA工具:
IntelliJ IDEA 2018.3.5及以上版本,可至IntelliJ IDEA官方网站下载。
- Demo:
在ROMA Connect实例控制台的“消息集成 MQS > Topic管理”页面,单击右上角的“用户指南 > 下载RESTful API Java Demo包”下载Demo。
- 打开IntelliJ IDEA,在菜单栏选择“Import Project”。
弹出“Select File or Directory to Import”对话框。
- 在弹出的对话框中选择解压后的RESTful API Java Demo路径,单击“OK”。
- “Import project from external model”选择“Eclipse”,单击“Next”,进入下一页后保持默认连续单击“Next”,直到“Please select project SDK”页面。
图1 Import Project
- 单击“Finish”,完成工程导入。
图2 Finish
- 编辑rest-config.properties
文件在src/main/resources目录下。将获取到的Kafka实例连接地址、Topic名称,以及SASL信息填写到下述配置中。其中参数kafka.rest.group为消费组ID,可在客户端指定。
1 2 3 4 5 6 7 8 9 10
# Kafka rest endpoint. kafka.rest.endpoint=https://{MQS_Instance_IP_Addr}:9292 # Kafka topic name. kafka.rest.topic=topic_name_demo # Kafka consume group. kafka.rest.group=group_id_demo # Kafka sasl username. kafka.rest.username=sasl_username_demo # Kafka sasl password. kafka.rest.password=sasl_user_passwd_demo
- 编辑log4j.properties
修改日志存储目录:
1
log.directory=D://workspace/logs
- 运行示例工程,查看消息生产与消费样例。
消息生成与消费的Main方法在RestMain.java中,以Java Application的方式运行即可。
示例代码解读
- 工程入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
public class RestMain { private static final Logger LOGGER = LoggerFactory.getLogger(RestMain.class); public static void main(String[] args) throws InterruptedException { //初始化请求对象。在RestServiceImpl类文件中还包含RESTful API组装,以及对请求签名 IRestService restService = new RestServiceImpl(); Base64.Decoder decoder = Base64.getDecoder(); //以下分别为生产消息、消费消息与消费确认 // Produce message ProduceReq messages = new ProduceReq(); messages.addMessage("{[{'id': '00001', 'name': 'John'}, {'id': '00002', 'name': 'Mike'}]}").addMessage("Kafka rest client demo!"); LOGGER.debug("produce message: {}", JsonUtils.convertObject2Str(messages)); restService.produce(messages); // Consume message List<ConsumeResp> consumeResps = restService.consume(); CommitReq commitReq = new CommitReq(); consumeResps.forEach(resp -> { LOGGER.debug("handler: {}, content: {}", resp.getHandler(), new String(decoder.decode(resp.getMessage().getContent()))); commitReq.addCommit(resp.getHandler()); }); // Commit message if (commitReq.getMessages().size() != 0) { CommitResp resp = restService.commit(commitReq); LOGGER.info("Commit resp: success: {}, failed: {}", resp.getSuccess(), resp.getFail()); } else { LOGGER.warn("Commit is empty."); } } }
- 消息组装与发送:
以生产消息为例,在下述方法中完成消息组装和签名。其中签名方法调用后,会返回两个消息头:Authorization和X-Sdk-Date,Authorization包含了对请求内容的签名信息。消息头的另一个参数Content-Type需要在代码中添加,参考示例的createRequest()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
public List<ProduceResp> produce(ProduceReq messages) { List<ProduceResp> prodResp = null; try { Request request = createRequest(); request.setUrl(produceURI); request.setMethod("POST"); request.setBody(JsonUtils.convertObject2Str(messages)); //对请求内容签名,签名后,请求头部参数会新增两个参数:Authorization和X-Sdk-Date,Authorization包含了对请求内容的签名信息。 HttpRequestBase signedRequest = Client.sign(request); LOGGER.debug("Request uri: {}, headers: {}", signedRequest.getURI(), signedRequest.getAllHeaders()); LOGGER.debug("Request body: {}", request.getBody()); HttpResponse response = HttpUtils.execute(signedRequest); if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CREATED) { String jsonStr = EntityUtils.toString(response.getEntity(), "UTF-8"); prodResp = JsonUtils.convertStr2ListObject(jsonStr, new TypeReference<List<ProduceResp>>() { }); LOGGER.info("Produce response: {}", jsonStr); return prodResp; } else { LOGGER.error("Produce message failed. statusCode: {}, error msg: {}", response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity(), "UTF-8")); } } catch (Exception e) { LOGGER.error("Produce message failed."); } return prodResp; }