Go SDK
本节以Linux系统为例介绍Go客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。
DMS兼容Kafka原生API,相关调用方法请参考Kafka官网说明。
客户端环境搭建
- 下载SDK包。
解压后获取kafka-dms-go.tar.gz文件。
- 安装go和pkg-config工具。
- 执行如下命令检查是否有go工具。
- 如果没有go工具,则参考如下地址进行下载和安装。
https://golang.org/doc/install?download=go1.10.3.linux-amd64.tar.gz
- 执行如下命令检查是否已安装pkg-config。
- 如果没有pkg-config,则参考如下地址进行下载和安装。
http://www.linuxfromscratch.org/blfs/view/7.4/general/pkgconfig.html
- 解压缩kafka-dms-go.tar.gz。
以下假设解压目录为{DMSPATH}。
- 拷贝kafka文件夹到go的src目录下。
cp {DMSPATH}/kafka-dms-go/confluent-kafka-go-0.11.0/kafka {GO_HOME}/src -R
- 拷贝kafka的pkg配置文件到pkg-config的配置目录。
cp {DMSPATH}/kafka-dms-go/librdkafka/lib/pkgconfig/* /usr/share/pkgconfig
- 拷贝kafka依赖到include目录下。
cp {DMSPATH}/kafka-dms-go/librdkafka/include/librdkafka /usr/include/ -R
- 拷贝kafka依赖到lib库中。
cp {DMSPATH}/kafka-dms-go/librdkafka/lib/* /usr/lib -R
- 安装Kafka。
go install kafka
- 修改producer_example.go中的配置参数:
cd {DMSPATH}/kafka-dms-go/example
vi producer_example.go
修改以下配置项:
broker := "broker address" topic := "kafka topic" config["sasl.project.id"] = "your projectId" config["sasl.access.key"] = "your ak" config["sasl.security.key"] = "your sk"
表1 参数说明 参数
说明
获取方式
broker
配置为Kafka的endpoint地址。
请参考准备环境
topic
配置为Kafka Topic的id。
sasl.project.id
项目ID。
sasl.access.key
租户AK。
security.key
租户SK。
- 修改consumer_example.go中的配置参数。
vi consumer_example.go
修改以下配置项:
broker := "broker address" group := "group id" var topics = []string{"kafka topic"} config["sasl.project.id"] = "your projectId" config["sasl.access.key"] = "your ak" config["sasl.security.key"] = "your sk"
表2 参数说明 参数
说明
获取方式
broker
配置为Kafka的endpoint地址。
请参考准备环境
group
配置为Kafka队列的消费组id
topics
配置为Kafka Topic的id。
sasl.project.id
项目ID。
sasl.access.key
租户AK。
security.key
租户SK。
运行示例
- 执行如下命令进行生产消息。
go run producer_example.go
说明:该示例命令将会发送1条消息到Kafka队列中,如需发送多条消息,则重复执行该条命令。
- 执行如下命令进行消费消息。
go run consumer_example.go
示例代码
生产消息
// Optional delivery channel, if not specified the Producer object's // .Events channel is used. deliveryChan := make(chan kafka.Event) value := "Hello Go!" err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan) e := <-deliveryChan m := e.(*kafka.Message)
消费消息
for run == true { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } }