更新时间:2022-02-21 GMT+08:00
运行示例工程
本章节以Java语言为示例介绍如何连接DMS Kafka队列并生产消费消息。
操作说明
- 登录弹性云服务器。
用户可以直接在192网段的ECS虚拟机上运行。
- 安装Java JDK或JRE,并配置JAVA_HOME与PATH环境变量,使用执行用户修改~/.bash_profile,添加如下行。
export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH
执行source .bash_profile命令使修改生效。
ECS虚拟机默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。
- 下载DmsKafkaDemo示例工程代码。
$ wget https://dms-demo.obs.cn-north-1.myhuaweicloud.com/DmsKafkaDemo.zip
- 解压DmsKafkaDemo.zip压缩包。
$ unzip DmsKafkaDemo.zip
- 进入DmsKafkaDemo/dist目录,该目录下包含预编译好的二进制文件和执行脚本。
$ cd DmsKafkaDemo/dist
- 编辑配置文件config/dms_kafka_client_jaas.conf,设置access_key、secret_key和project_id。
$ vim config/dms_kafka_client_jaas.conf
设置内容如下(其中加粗内容需要替换为实际值):
KafkaClient { com.huawei.middleware.kafka.sasl.client.KafkaLoginModule required access_key="********************" secret_key="**********" project_id="bd67aaead60940d688b872c31bdc653b" target_project_id="bd67aaead60940d688b872c31bdc6539"; };
如果需要访问其他租户授权的队列,则需要配置授权者的Project ID,即配置target_project_id为授权者的Project ID。
- 编辑配置文件config/producer.properties,设置topic和bootstrap.servers。
$ vim config/producer.properties
设置内容如下(其中加粗内容需要替换为实际值):
topic=k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 bootstrap.servers=dms-kafka.cn-north-1.myhuaweicloud.com:37000 ssl.truststore.password=************ acks=all retries=1 batch.size=16384 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_SSL sasl.mechanism=DMS
参数topic可配置为“队列名称”或者“Kafka Topic”,可参考表2。
- 编辑配置文件config/consumer.properties,设置topic、bootstrap.servers和group.id。
$ vim config/consumer.properties
设置内容如下(其中加粗内容需要替换为实际值):
topic=k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 bootstrap.servers=dms-kafka.cn-north-1.myhuaweicloud.com:37000 group.id=g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7 ssl.truststore.password=************ security.protocol=SASL_SSL sasl.mechanism=DMS key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer auto.offset.reset=earliest enable.auto.commit=false
参数topic可配置为“队列名称”或者“Kafka Topic”,可参考表2。
- 运行生产消息示例。
$ bash produce.sh
执行完命令自动发送10条消息到Kafka队列中。
- 运行消费消息示例。
$ bash consume.sh
父主题: Java SDK