Updated on 2023-11-30 GMT+08:00

Go SDK

This section describes how to access a DMS Kafka queue by using a Linux Go client. To obtain related configuration information, see Preparing the Environment.

DMS is compatible with native Kafka APIs. For details on how to call native Kafka APIs, see the Kafka Documentation.

Preparing the Client Environment

  1. Download the SDK package.

    Decompress the kafka-dms-go.tar.gz file from the SDK package.

  2. Install the Go and pkg-config tools.

    1. Run the following command to check whether the Go tool has been installed:

      go version

    2. If not, download the tool at the following address and then install the tool.

      https://golang.org/doc/install?download=go1.10.3.linux-amd64.tar.gz

    3. Run the following command to check whether the pkg-config tool has been installed:

      pkg-config --version

    4. If not, download the tool at the following address and then install the tool.

      http://www.linuxfromscratch.org/blfs/view/7.4/general/pkgconfig.html

  3. Decompress the kafka-dms-go.tar.gz package.

    The following assumes that the package is decompressed to the {DMSPATH} directory.

  4. Run the following command to copy the kafka folder to the src directory of the Go tool:

    cp {DMSPATH}/kafka-dms-go/confluent-kafka-go-0.11.0/kafka {GO_HOME}/src -R

  5. Run the following command to copy the pkgconfig file to the configuration directory of the pkg-config tool:

    cp {DMSPATH}/kafka-dms-go/librdkafka/lib/pkgconfig/* /usr/share/pkgconfig

  6. Run the following command to copy the Kafka dependency packages to the include directory:

    cp {DMSPATH}/kafka-dms-go/librdkafka/include/librdkafka /usr/include/ -R

  7. Run the following command to copy the Kafka dependency packages to the lib directory:

    cp {DMSPATH}/kafka-dms-go/librdkafka/lib/* /usr/lib -R

  8. Run the following command to install Kafka:

    go install kafka

  9. Modify the parameters in the producer_example.go file.

    cd {DMSPATH}/kafka-dms-go/example

    vi producer_example.go

    Modify the following parameters:

    broker := "broker address"
    topic := "kafka topic"
    
    config["sasl.project.id"] = "your projectId"
    config["sasl.access.key"] = "your ak"
    config["sasl.security.key"] = "your sk"
    Table 1 Parameter description

    Parameter

    Description

    How to Obtain

    broker

    Kafka endpoint

    For details, see Preparing the Environment.

    topic

    Kafka topic ID

    sasl.project.id

    Project ID

    sasl.access.key

    Tenant AK

    security.key

    Tenant SK

  10. Modify the parameters in the consumer_example.go file.

    vi consumer_example.go

    Modify the following parameters:

    broker := "broker address"
    group := "group id"
    var topics = []string{"kafka topic"}
    
    config["sasl.project.id"] = "your projectId"
    config["sasl.access.key"] = "your ak"
    config["sasl.security.key"] = "your sk"
    Table 2 Parameter description

    Parameter

    Description

    How to Obtain

    broker

    Kafka endpoint

    For details, see Preparing the Environment.

    group

    Kafka consumer group ID

    topics

    Kafka topic ID

    sasl.project.id

    Project ID

    sasl.access.key

    Tenant AK

    security.key

    Tenant SK

Running Example

  1. Run the following command to produce messages:

    go run producer_example.go

    After the command is run, one message will be automatically sent to the Kafka queue. To send multiple messages, repeat running this command.

  2. Run the following command to consume messages:

    go run consumer_example.go

Code Example

Producer parameters

    // Optional delivery channel, if not specified the Producer object's
    // .Events channel is used.
    deliveryChan := make(chan kafka.Event)

    value := "Hello Go!"
    err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)

    e := <-deliveryChan
    m := e.(*kafka.Message)

Consumer parameters

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n",
                    e.TopicPartition, string(e.Value))
            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                run = false
            default:
                fmt.Printf("Ignored %v\n", e)
            }
        }
    }