文档首页> > 开发指南> Kafka开发指南> Go SDK

Go SDK

更新时间: 2018-11-21 21:10

本节以Linux系统为例介绍Go客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。

DMS兼容Kafka原生API,相关调用方法请参考Kafka 官网说明

客户端环境搭建

  1. 下载SDK包

    解压后获取kafka-dms-go.tar.gz文件。

  2. 安装go和pkg-config工具。

    1. 执行如下命令检查是否有go工具。

      go version

    2. 如果没有go工具,则参考如下地址进行下载和安装。

      https://golang.org/doc/install?download=go1.10.3.linux-amd64.tar.gz

    3. 执行如下命令检查是否已安装pkg-config。

      pkg-config --version

    4. 如果没有pkg-config,则参考如下地址进行下载和安装。

      http://www.linuxfromscratch.org/blfs/view/7.4/general/pkgconfig.html

  3. 解压缩kafka-dms-go.tar.gz。

    以下假设解压目录为{DMSPATH}。

  4. 拷贝kafka文件夹到go的src目录下。

    cp {DMSPATH}/kafka-dms-go/confluent-kafka-go-0.11.0/kafka {GO_HOME}/src -R

  5. 拷贝kafka的pkg配置文件到pkg-config的配置目录。

    cp {DMSPATH}/kafka-dms-go/librdkafka/lib/pkgconfig/* /usr/share/pkgconfig

  6. 拷贝kafka依赖到include目录下。

    cp {DMSPATH}/kafka-dms-go/librdkafka/include/librdkafka /usr/include/ -R

  7. 拷贝kafka依赖到lib库中。

    cp {DMSPATH}/kafka-dms-go/librdkafka/lib/* /usr/lib -R

  8. 安装Kafka。

    go install kafka

  9. 修改producer_example.go中的配置参数:

    cd {DMSPATH}/kafka-dms-go/example

    vi producer_example.go

    修改以下配置项:

    broker := "broker address"
    topic := "kafka topic"
    
    config["sasl.project.id"] = "your projectId"
    config["sasl.access.key"] = "your ak"
    config["sasl.security.key"] = "your sk"
    表1 参数说明

    参数

    说明

    获取方式

    broker

    配置为Kafka的endpoint地址。

    请参考准备环境

    topic

    配置为Kafka Topic的id。

    sasl.project.id

    项目 ID。

    sasl.access.key

    租户AK。

    security.key

    租户SK。

  10. 修改consumer_example.go中的配置参数。

    vi consumer_example.go

    修改以下配置项:

    broker := "broker address"
    group := "group id"
    var topics = []string{"kafka topic"}
    
    config["sasl.project.id"] = "your projectId"
    config["sasl.access.key"] = "your ak"
    config["sasl.security.key"] = "your sk"
    表2 参数说明

    参数

    说明

    获取方式

    broker

    配置为Kafka的endpoint地址。

    请参考准备环境

    group

    配置为Kafka队列的消费组id

    topics

    配置为Kafka Topic的id。

    sasl.project.id

    项目 ID。

    sasl.access.key

    租户AK。

    security.key

    租户SK。

运行示例

  1. 执行如下命令进行生产消息。

    go run producer_example.go

    说明:该示例命令将会发送1条消息到Kafka队列中,如需发送多条消息,则重复执行该条命令。

  2. 执行如下命令进行消费消息。

    go run consumer_example.go

示例代码

生产消息

    // Optional delivery channel, if not specified the Producer object's
    // .Events channel is used.
    deliveryChan := make(chan kafka.Event)

    value := "Hello Go!"
    err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)

    e := <-deliveryChan
    m := e.(*kafka.Message)

消费消息

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n",
                    e.TopicPartition, string(e.Value))
            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                run = false
            default:
                fmt.Printf("Ignored %v\n", e)
            }
        }
    }

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

在文档使用中是否遇到以下问题







请至少选择或填写一项反馈信息

字符长度不能超过100

反馈内容不能为空!

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区
点我,12·12惊喜等着你哦~