Updated on 2023-10-10 GMT+08:00

Configuring a Kafka Client in Go

Scenarios

This section uses Linux CentOS as an example to describe how to connect a Go Kafka client to MQS (including Kafka client installation), and how to produce and consume messages.

Prerequisites

  • You have obtained MQS connection information. For details, see Preparations.
  • You have installed the development tool and Python development environment. For details, see Preparations.

Installing the Kafka Client

MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the Go open-source client, see suggested client versions.

Run the following command to install the Go Kafka client of the corresponding version:
go get github.com/confluentinc/confluent-kafka-go/kafka

Producing Messages

  • SASL authentication mode

    Replace the information in bold with the actual values.

    package main
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
        brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	topics   = "topic_name"
    	user     = "username"
    	password = "password"
    	caFile   = "phy_ca.crt"
    )
    
    func main() {
    	log.Println("Starting a new kafka producer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    		"security.protocol": "SASL_SSL",
    		"sasl.mechanism":    "PLAIN",
    		"sasl.username":     user,
    		"sasl.password":     password,
    		"ssl.ca.location":   caFile,
    	}
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Panicf("producer error, err: %v", err)
    		return
    	}
    
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					log.Printf("Delivery failed: %v\n", ev.TopicPartition)
    				} else {
    					log.Printf("Delivered message to %v\n", ev.TopicPartition)
    				}
    			}
    		}
    	}()
    
    	// Produce messages to topic (asynchronously)
    	fmt.Println("please enter message:")
    	go func() {
    		for {
    			err := producer.Produce(&kafka.Message{
    				TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
    				Value:          GetInput(),
    			}, nil)
    			if err != nil {
    				log.Panicf("send message fail, err: %v", err)
    				return
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	// Wait for message deliveries before shutting down
    	producer.Flush(15 * 1000)
    	producer.Close()
    }
    
    func GetInput() []byte {
    	reader := bufio.NewReader(os.Stdin)
    	data, _, _ := reader.ReadLine()
    	return data
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • brokers: MQS connection addresses and ports
    • topics: names of the topics that produce messages
    • user and password: username and password used for SASL_SSL authentication
    • caFile: client certificate used for SASL_SSL authentication
  • Non-SASL authentication mode

    Replace the information in bold with the actual values.

    package main
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
    	brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	topics   = "topic_name"
    )
    
    func main() {
    	log.Println("Starting a new kafka producer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    	}
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Panicf("producer error, err: %v", err)
    		return
    	}
    
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					log.Printf("Delivery failed: %v\n", ev.TopicPartition)
    				} else {
    					log.Printf("Delivered message to %v\n", ev.TopicPartition)
    				}
    			}
    		}
    	}()
    
    	// Produce messages to topic (asynchronously)
    	fmt.Println("please enter message:")
    	go func() {
    		for {
    			err := producer.Produce(&kafka.Message{
    				TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
    				Value:          GetInput(),
    			}, nil)
    			if err != nil {
    				log.Panicf("send message fail, err: %v", err)
    				return
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	// Wait for message deliveries before shutting down
    	producer.Flush(15 * 1000)
    	producer.Close()
    }
    
    func GetInput() []byte {
    	reader := bufio.NewReader(os.Stdin)
    	data, _, _ := reader.ReadLine()
    	return data
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • brokers: MQS connection addresses and ports
    • topics: names of the topics that produce messages

Consuming Messages

  • SASL authentication mode

    Replace the information in bold with the actual values.

    package main
    
    import (
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
    	brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	group    = "group_id"
    	topics   = "topic_name"
    	user     = "username"
    	password = "password"
    	caFile   = "phy_ca.crt"
    )
    
    func main() {
    	log.Println("Starting a new kafka consumer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    		"group.id":          group,
    		"auto.offset.reset": "earliest",
    		"security.protocol": "SASL_SSL",
    		"sasl.mechanism":    "PLAIN",
    		"sasl.username":     user,
    		"sasl.password":     password,
    		"ssl.ca.location":   caFile,
    	}
    
    	consumer, err := kafka.NewConsumer(config)
    	if err != nil {
    		log.Panicf("Error creating consumer: %v", err)
    		return
    	}
    
    	err = consumer.SubscribeTopics([]string{topics}, nil)
    	if err != nil {
    		log.Panicf("Error subscribe consumer: %v", err)
    		return
    	}
    
    	go func() {
    		for {
    			msg, err := consumer.ReadMessage(-1)
    			if err != nil {
    				log.Printf("Consumer error: %v (%v)", err, msg)
    			} else {
    				fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	if err = consumer.Close(); err != nil {
    		log.Panicf("Error closing consumer: %v", err)
    	}
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • brokers: MQS connection addresses and ports
    • group: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
    • topics: names of the topics that consume messages
    • user and password: username and password used for SASL_SSL authentication
    • caFile: client certificate used for SASL_SSL authentication
  • Non-SASL authentication mode

    Replace the information in bold with the actual values.

    package main
    
    import (
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
    	brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	group    = "group_id"
    	topics   = "topic_name"
    )
    
    func main() {
    	log.Println("Starting a new kafka consumer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    		"group.id":          group,
    		"auto.offset.reset": "earliest",
    	}
    
    	consumer, err := kafka.NewConsumer(config)
    	if err != nil {
    		log.Panicf("Error creating consumer: %v", err)
    		return
    	}
    
    	err = consumer.SubscribeTopics([]string{topics}, nil)
    	if err != nil {
    		log.Panicf("Error subscribe consumer: %v", err)
    		return
    	}
    
    	go func() {
    		for {
    			msg, err := consumer.ReadMessage(-1)
    			if err != nil {
    				log.Printf("Consumer error: %v (%v)", err, msg)
    			} else {
    				fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	if err = consumer.Close(); err != nil {
    		log.Panicf("Error closing consumer: %v", err)
    	}
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • brokers: MQS connection addresses and ports
    • group: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
    • topics: names of the topics that consume messages