Configuring a Kafka Client in Go
Scenarios
This section uses Linux CentOS as an example to describe how to connect a Go Kafka client to MQS (including Kafka client installation), and how to produce and consume messages.
Prerequisites
- You have obtained MQS connection information. For details, see Preparations.
- You have installed the development tool and Python development environment. For details, see Preparations.
Installing the Kafka Client
MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the Go open-source client, see suggested client versions.
go get github.com/confluentinc/confluent-kafka-go/kafka
Producing Messages
- SASL authentication mode
Replace the information in bold with the actual values.
package main import ( "bufio" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" topics = "topic_name" user = "username" password = "password" caFile = "phy_ca.crt" ) func main() { log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, } producer, err := kafka.NewProducer(config) if err != nil { log.Panicf("producer error, err: %v", err) return } go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { log.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) fmt.Println("please enter message:") go func() { for { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(), }, nil) if err != nil { log.Panicf("send message fail, err: %v", err) return } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) producer.Close() } func GetInput() []byte { reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- brokers: MQS connection addresses and ports
- topics: names of the topics that produce messages
- user and password: username and password used for SASL_SSL authentication
- caFile: client certificate used for SASL_SSL authentication
- Non-SASL authentication mode
Replace the information in bold with the actual values.
package main import ( "bufio" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" topics = "topic_name" ) func main() { log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, } producer, err := kafka.NewProducer(config) if err != nil { log.Panicf("producer error, err: %v", err) return } go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { log.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) fmt.Println("please enter message:") go func() { for { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(), }, nil) if err != nil { log.Panicf("send message fail, err: %v", err) return } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) producer.Close() } func GetInput() []byte { reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- brokers: MQS connection addresses and ports
- topics: names of the topics that produce messages
Consuming Messages
- SASL authentication mode
Replace the information in bold with the actual values.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" group = "group_id" topics = "topic_name" user = "username" password = "password" caFile = "phy_ca.crt" ) func main() { log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, } consumer, err := kafka.NewConsumer(config) if err != nil { log.Panicf("Error creating consumer: %v", err) return } err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil { log.Panicf("Error subscribe consumer: %v", err) return } go func() { for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Consumer error: %v (%v)", err, msg) } else { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } if err = consumer.Close(); err != nil { log.Panicf("Error closing consumer: %v", err) } }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- brokers: MQS connection addresses and ports
- group: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
- topics: names of the topics that consume messages
- user and password: username and password used for SASL_SSL authentication
- caFile: client certificate used for SASL_SSL authentication
- Non-SASL authentication mode
Replace the information in bold with the actual values.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" group = "group_id" topics = "topic_name" ) func main() { log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", } consumer, err := kafka.NewConsumer(config) if err != nil { log.Panicf("Error creating consumer: %v", err) return } err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil { log.Panicf("Error subscribe consumer: %v", err) return } go func() { for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Consumer error: %v (%v)", err, msg) } else { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } if err = consumer.Close(); err != nil { log.Panicf("Error closing consumer: %v", err) } }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- brokers: MQS connection addresses and ports
- group: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
- topics: names of the topics that consume messages
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot