Go SDK
This section describes how to access a DMS Kafka queue by using a Linux Go client. To obtain related configuration information, see Preparing the Environment.
DMS is compatible with native Kafka APIs. For details on how to call native Kafka APIs, see the Kafka Documentation.
Preparing the Client Environment
- Download the SDK package.
Decompress the kafka-dms-go.tar.gz file from the SDK package.
- Install the Go and pkg-config tools.
- Run the following command to check whether the Go tool has been installed:
- If not, download the tool at the following address and then install the tool.
https://golang.org/doc/install?download=go1.10.3.linux-amd64.tar.gz
- Run the following command to check whether the pkg-config tool has been installed:
- If not, download the tool at the following address and then install the tool.
http://www.linuxfromscratch.org/blfs/view/7.4/general/pkgconfig.html
- Decompress the kafka-dms-go.tar.gz package.
The following assumes that the package is decompressed to the {DMSPATH} directory.
- Run the following command to copy the kafka folder to the src directory of the Go tool:
cp {DMSPATH}/kafka-dms-go/confluent-kafka-go-0.11.0/kafka {GO_HOME}/src -R
- Run the following command to copy the pkgconfig file to the configuration directory of the pkg-config tool:
cp {DMSPATH}/kafka-dms-go/librdkafka/lib/pkgconfig/* /usr/share/pkgconfig
- Run the following command to copy the Kafka dependency packages to the include directory:
cp {DMSPATH}/kafka-dms-go/librdkafka/include/librdkafka /usr/include/ -R
- Run the following command to copy the Kafka dependency packages to the lib directory:
cp {DMSPATH}/kafka-dms-go/librdkafka/lib/* /usr/lib -R
- Run the following command to install Kafka:
go install kafka
- Modify the parameters in the producer_example.go file.
cd {DMSPATH}/kafka-dms-go/example
vi producer_example.go
Modify the following parameters:
broker := "broker address" topic := "kafka topic" config["sasl.project.id"] = "your projectId" config["sasl.access.key"] = "your ak" config["sasl.security.key"] = "your sk"
Table 1 Parameter description Parameter
Description
How to Obtain
broker
Kafka endpoint
For details, see Preparing the Environment.
topic
Kafka topic ID
sasl.project.id
Project ID
sasl.access.key
Tenant AK
security.key
Tenant SK
- Modify the parameters in the consumer_example.go file.
vi consumer_example.go
Modify the following parameters:
broker := "broker address" group := "group id" var topics = []string{"kafka topic"} config["sasl.project.id"] = "your projectId" config["sasl.access.key"] = "your ak" config["sasl.security.key"] = "your sk"
Table 2 Parameter description Parameter
Description
How to Obtain
broker
Kafka endpoint
For details, see Preparing the Environment.
group
Kafka consumer group ID
topics
Kafka topic ID
sasl.project.id
Project ID
sasl.access.key
Tenant AK
security.key
Tenant SK
Running Example
- Run the following command to produce messages:
go run producer_example.go
After the command is run, one message will be automatically sent to the Kafka queue. To send multiple messages, repeat running this command.
- Run the following command to consume messages:
go run consumer_example.go
Code Example
Producer parameters
// Optional delivery channel, if not specified the Producer object's // .Events channel is used. deliveryChan := make(chan kafka.Event) value := "Hello Go!" err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan) e := <-deliveryChan m := e.(*kafka.Message)
Consumer parameters
for run == true { select { case sig := <-sigchan: fmt.Printf("Caught signal %v: terminating\n", sig) run = false default: ev := c.Poll(100) if ev == nil { continue } switch e := ev.(type) { case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot