更新时间:2024-08-13 GMT+08:00
Go
本文以Linux CentOS环境为例,介绍Go 1.16.5版本的Kafka客户端连接指导,包括demo代码库的获取,以及生产、消费消息。
使用前请参考收集连接信息收集Kafka所需的连接信息。
准备环境
- 执行以下命令,检查是否已安装Go。
go version
返回如下回显时,说明Go已经安装。
[root@ecs-test confluent-kafka-go]# go version go version go1.16.5 linux/amd64
如果未安装Go,参考如下步骤安装。
#下载Go安装包。 wget https://go.dev/dl/go1.16.5.linux-amd64.tar.gz #解压安装包到“/usr/local”目录,“/usr/local”目录可以根据实际情况修改。 sudo tar -C /usr/local -xzf go1.16.5.linux-amd64.tar.gz #设置环境变量。 echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.profile source ~/.profile
- 执行以下命令,获取demo需要的代码库。
go get github.com/confluentinc/confluent-kafka-go/kafka
生产消息
- SASL认证方式
package main import ( "bufio" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" topics = "topic_name" user = "username" password = "password" caFile = "phy_ca.crt" //SSL证书参考“收集连接信息”章节获取。如果Kafka安全协议设置为“SASL_PLAINTEXT”,请删除此参数。 ) func main() { log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, //如果Kafka安全协议设置为“SASL_PLAINTEXT”,请删除此参数。 "ssl.endpoint.identification.algorithm": "none", } producer, err := kafka.NewProducer(config) if err != nil { log.Panicf("producer error, err: %v", err) return } go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { log.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) fmt.Println("please enter message:") go func() { for { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(), }, nil) if err != nil { log.Panicf("send message fail, err: %v", err) return } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) producer.Close() } func GetInput() []byte { reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- brokers:实例连接地址与端口。
- topics:Topic名称。
- user/password:首次开启密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
- caFile:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。
- security.protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
- 安全协议设置为“SASL_SSL”时,采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。此时需要配置连接实例的用户名和密码,以及证书文件。
- 安全协议设置为“SASL_PLAINTEXT”时,采用SASL方式进行认证,数据通过明文传输,性能更好。此时需要配置连接实例的用户名和密码,无需配置证书文件。
- sasl.mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
- 非SASL认证方式
package main import ( "bufio" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" topics = "topic_name" ) func main() { log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, } producer, err := kafka.NewProducer(config) if err != nil { log.Panicf("producer error, err: %v", err) return } go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { log.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) fmt.Println("please enter message:") go func() { for { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(), }, nil) if err != nil { log.Panicf("send message fail, err: %v", err) return } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) producer.Close() } func GetInput() []byte { reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- brokers:实例连接地址与端口。
- topics:Topic名称。
消费消息
- SASL认证方式
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" group = "group-id" topics = "topic_name" user = "username" password = "password" caFile = "phy_ca.crt" //SSL证书参考“收集连接信息”章节获取。如果Kafka安全协议设置为“SASL_PLAINTEXT”,请删除此参数。 ) func main() { log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, //如果Kafka安全协议设置为“SASL_PLAINTEXT”,请删除此参数。 "ssl.endpoint.identification.algorithm": "none", } consumer, err := kafka.NewConsumer(config) if err != nil { log.Panicf("Error creating consumer: %v", err) return } err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil { log.Panicf("Error subscribe consumer: %v", err) return } go func() { for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Consumer error: %v (%v)", err, msg) } else { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } if err = consumer.Close(); err != nil { log.Panicf("Error closing consumer: %v", err) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- brokers:实例连接地址与端口。
- group:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
- topics:Topic名称。
- user/password:首次开启密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
- caFile:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。
- security.protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
- 安全协议设置为“SASL_SSL”时,采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。此时需要配置连接实例的用户名和密码,以及证书文件。
- 安全协议设置为“SASL_PLAINTEXT”时,采用SASL方式进行认证,数据通过明文传输,性能更好。此时需要配置连接实例的用户名和密码,无需配置证书文件。
- sasl.mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
- 非SASL认证方式
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" group = "group-id" topics = "topic_name" ) func main() { log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", } consumer, err := kafka.NewConsumer(config) if err != nil { log.Panicf("Error creating consumer: %v", err) return } err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil { log.Panicf("Error subscribe consumer: %v", err) return } go func() { for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Consumer error: %v (%v)", err, msg) } else { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } if err = consumer.Close(); err != nil { log.Panicf("Error closing consumer: %v", err) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- brokers:实例连接地址与端口。
- group:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
- topics:Topic名称。