C#客户端使用说明
操作场景
本文介绍C#版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。
引入Kafka客户端
MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。C#开源客户端的版本使用请参见客户端版本使用建议。
执行以下命令下载安装C#的Kafka依赖库。
dotnet add package -v 1.5.2 Confluent.Kafka
生产消息
- SASL认证方式
using System; using Confluent.Kafka; class Producer { public static void Main(string[] args) { var conf = new ProducerConfig { bootstrap_servers = “ip1:port1,ip2:port2,ip3:port3”, context.load_verify_locations = “phy_ca.crt“, sasl_mechanism = “PLAIN”, security_protocol= “SASL_SSL”, SaslUsername = “username”, SaslPassword = “password”, }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $”Delivered message to {r.TopicPartitionOffset}” : $”Delivery Error: {r.Error.Reason}”); string topic = “topic_name”; using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i=0; i<100; ++i) { p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } } }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap_servers:MQS连接地址和端口。
- topic:要生产消息的Topic名称。
- SaslUsername和SaslPassword:开启SASL_SSL认证时所使用的用户名和密码。
- context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。
- 非SASL认证方式
using System; using Confluent.Kafka; class Producer { public static void Main(string[] args) { var conf = new ProducerConfig { bootstrap_servers = “ip1:port1,ip2:port2,ip3:port3”, }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $”Delivered message to {r.TopicPartitionOffset}” : $”Delivery Error: {r.Error.Reason}”); string topic = “topic_name”; using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i=0; i<100; ++i) { p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } } }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- bootstrap_servers:MQS连接地址和端口。
- topic:要生产消息的Topic名称。
消费消息
- SASL认证方式
using System; using System.Threading; using Confluent.Kafka; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = “group_id”, BootstrapServers = “ip1:port1,ip2:port2,ip3:port3”, SslCaLocation = “phy_ca.crt”, SaslMechanism = “PLAIN”, SecurityProtocol = SASL_SSL, SaslUsername = “username”, SaslPassword = “password”, AutoOffsetReset = “earliest” }; string topic = “topic_name”; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } } }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- BootstrapServers:MQS连接地址和端口。
- GroupId:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
- topic:要消费消息的Topic名称。
- SaslUsername和SaslPassword:开启SASL_SSL认证时所使用的用户名和密码。
- SslCaLocation:开启SASL_SSL认证时所使用的客户端证书。
- 非SASL认证方式
using System; using System.Threading; using Confluent.Kafka; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = “group_id”, BootstrapServers = “ip1:port1,ip2:port2,ip3:port3”, AutoOffsetReset = “earliest” }; string topic = “topic_name”; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } } }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- BootstrapServers:MQS连接地址和端口。
- GroupId:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
- topic:要消费消息的Topic名称。