更新时间:2023-08-28 GMT+08:00

Go客户端使用说明

操作场景

本文以Linux CentOS环境为例,介绍Go版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。

前提条件

  • 已获取MQS连接信息,具体请参见开发准备
  • 已安装开发工具和Python开发语言环境,具体请参见开发准备

引入Kafka客户端

MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。Go开源客户端的版本使用请参见客户端版本使用建议

执行以下命令,安装对应版本的Go Kafka客户端。
go get github.com/confluentinc/confluent-kafka-go/kafka

生产消息

  • SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    package main
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
        brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	topics   = "topic_name"
    	user     = "username"
    	password = "password"
    	caFile   = "phy_ca.crt"
    )
    
    func main() {
    	log.Println("Starting a new kafka producer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    		"security.protocol": "SASL_SSL",
    		"sasl.mechanism":    "PLAIN",
    		"sasl.username":     user,
    		"sasl.password":     password,
    		"ssl.ca.location":   caFile,
    	}
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Panicf("producer error, err: %v", err)
    		return
    	}
    
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					log.Printf("Delivery failed: %v\n", ev.TopicPartition)
    				} else {
    					log.Printf("Delivered message to %v\n", ev.TopicPartition)
    				}
    			}
    		}
    	}()
    
    	// Produce messages to topic (asynchronously)
    	fmt.Println("please enter message:")
    	go func() {
    		for {
    			err := producer.Produce(&kafka.Message{
    				TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
    				Value:          GetInput(),
    			}, nil)
    			if err != nil {
    				log.Panicf("send message fail, err: %v", err)
    				return
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	// Wait for message deliveries before shutting down
    	producer.Flush(15 * 1000)
    	producer.Close()
    }
    
    func GetInput() []byte {
    	reader := bufio.NewReader(os.Stdin)
    	data, _, _ := reader.ReadLine()
    	return data
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • brokers:MQS连接地址和端口。
    • topics:要生产消息的Topic名称。
    • user和password:开启SASL_SSL认证时所使用的用户名和密码。
    • caFile:开启SASL_SSL认证时所使用的客户端证书。
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    package main
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
    	brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	topics   = "topic_name"
    )
    
    func main() {
    	log.Println("Starting a new kafka producer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    	}
    	producer, err := kafka.NewProducer(config)
    	if err != nil {
    		log.Panicf("producer error, err: %v", err)
    		return
    	}
    
    	go func() {
    		for e := range producer.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					log.Printf("Delivery failed: %v\n", ev.TopicPartition)
    				} else {
    					log.Printf("Delivered message to %v\n", ev.TopicPartition)
    				}
    			}
    		}
    	}()
    
    	// Produce messages to topic (asynchronously)
    	fmt.Println("please enter message:")
    	go func() {
    		for {
    			err := producer.Produce(&kafka.Message{
    				TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
    				Value:          GetInput(),
    			}, nil)
    			if err != nil {
    				log.Panicf("send message fail, err: %v", err)
    				return
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	// Wait for message deliveries before shutting down
    	producer.Flush(15 * 1000)
    	producer.Close()
    }
    
    func GetInput() []byte {
    	reader := bufio.NewReader(os.Stdin)
    	data, _, _ := reader.ReadLine()
    	return data
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • brokers:MQS连接地址和端口。
    • topics:要生产消息的Topic名称。

消费消息

  • SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    package main
    
    import (
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
    	brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	group    = "group_id"
    	topics   = "topic_name"
    	user     = "username"
    	password = "password"
    	caFile   = "phy_ca.crt"
    )
    
    func main() {
    	log.Println("Starting a new kafka consumer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    		"group.id":          group,
    		"auto.offset.reset": "earliest",
    		"security.protocol": "SASL_SSL",
    		"sasl.mechanism":    "PLAIN",
    		"sasl.username":     user,
    		"sasl.password":     password,
    		"ssl.ca.location":   caFile,
    	}
    
    	consumer, err := kafka.NewConsumer(config)
    	if err != nil {
    		log.Panicf("Error creating consumer: %v", err)
    		return
    	}
    
    	err = consumer.SubscribeTopics([]string{topics}, nil)
    	if err != nil {
    		log.Panicf("Error subscribe consumer: %v", err)
    		return
    	}
    
    	go func() {
    		for {
    			msg, err := consumer.ReadMessage(-1)
    			if err != nil {
    				log.Printf("Consumer error: %v (%v)", err, msg)
    			} else {
    				fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	if err = consumer.Close(); err != nil {
    		log.Panicf("Error closing consumer: %v", err)
    	}
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • brokers:MQS连接地址和端口。
    • group:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
    • topics:要消费消息的Topic名称。
    • user和password:开启SASL_SSL认证时所使用的用户名和密码。
    • caFile:开启SASL_SSL认证时所使用的客户端证书。
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    package main
    
    import (
    	"fmt"
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var (
    	brokers  = "ip1:port1,ip2:port2,ip3:port3"
    	group    = "group_id"
    	topics   = "topic_name"
    )
    
    func main() {
    	log.Println("Starting a new kafka consumer")
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": brokers,
    		"group.id":          group,
    		"auto.offset.reset": "earliest",
    	}
    
    	consumer, err := kafka.NewConsumer(config)
    	if err != nil {
    		log.Panicf("Error creating consumer: %v", err)
    		return
    	}
    
    	err = consumer.SubscribeTopics([]string{topics}, nil)
    	if err != nil {
    		log.Panicf("Error subscribe consumer: %v", err)
    		return
    	}
    
    	go func() {
    		for {
    			msg, err := consumer.ReadMessage(-1)
    			if err != nil {
    				log.Printf("Consumer error: %v (%v)", err, msg)
    			} else {
    				fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    			}
    		}
    	}()
    
    	sigterm := make(chan os.Signal, 1)
    	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    	select {
    	case <-sigterm:
    		log.Println("terminating: via signal")
    	}
    	if err = consumer.Close(); err != nil {
    		log.Panicf("Error closing consumer: %v", err)
    	}
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • brokers:MQS连接地址和端口。
    • group:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
    • topics:要消费消息的Topic名称。