更新时间:2023-11-17 GMT+08:00

C#客户端使用说明

操作场景

本文介绍C#版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。

前提条件

  • 已获取MQS连接信息,具体请参见开发准备
  • 已安装开发工具和C#开发语言环境,具体请参见开发准备

引入Kafka客户端

MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。C#开源客户端的版本使用请参见客户端版本使用建议

执行以下命令下载安装C#的Kafka依赖库。

dotnet add package -v 1.5.2 Confluent.Kafka

生产消息

  • SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    using System;
    using Confluent.Kafka;
     
    class Producer
    {
        public static void Main(string[] args)
        {
            var conf = new ProducerConfig {
                bootstrap_servers = “ip1:port1,ip2:port2,ip3:port3”,
                context.load_verify_locations = “phy_ca.crt“,
                sasl_mechanism = “PLAIN”,
                security_protocol= “SASL_SSL”,
                SaslUsername = “username”,
                SaslPassword = “password”,
                };
     
            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $”Delivered message to {r.TopicPartitionOffset}”
                    : $”Delivery Error: {r.Error.Reason}”);
     
            string topic = “topic_name”;
     
            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                for (int i=0; i<100; ++i)
                {
                    p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
                }
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • bootstrap_servers:MQS连接地址和端口。
    • topic:要生产消息的Topic名称。
    • SaslUsername和SaslPassword:开启SASL_SSL认证时所使用的用户名和密码。
    • context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    using System;
    using Confluent.Kafka;
     
    class Producer
    {
        public static void Main(string[] args)
        {
            var conf = new ProducerConfig {
                bootstrap_servers = “ip1:port1,ip2:port2,ip3:port3”,
                };
     
            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $”Delivered message to {r.TopicPartitionOffset}”
                    : $”Delivery Error: {r.Error.Reason}”);
     
            string topic = “topic_name”;
     
            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                for (int i=0; i<100; ++i)
                {
                    p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
                }
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • bootstrap_servers:MQS连接地址和端口。
    • topic:要生产消息的Topic名称。

消费消息

  • SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    using System;
    using System.Threading;
    using Confluent.Kafka;
     
    class Consumer
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig {
                GroupId = “group_id”,
                BootstrapServers = “ip1:port1,ip2:port2,ip3:port3”,
                SslCaLocation = “phy_ca.crt”, 
                SaslMechanism = “PLAIN”,
                SecurityProtocol = SASL_SSL,
                SaslUsername = “username”,
                SaslPassword = “password”,
                AutoOffsetReset = “earliest”
            };
     
            string topic = “topic_name”;
     
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(topic);
     
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };
     
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • BootstrapServers:MQS连接地址和端口。
    • GroupId:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
    • topic:要消费消息的Topic名称。
    • SaslUsername和SaslPassword:开启SASL_SSL认证时所使用的用户名和密码。
    • SslCaLocation:开启SASL_SSL认证时所使用的客户端证书。
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    using System;
    using System.Threading;
    using Confluent.Kafka;
     
    class Consumer
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig {
                GroupId = “group_id”,
                BootstrapServers = “ip1:port1,ip2:port2,ip3:port3”,
                AutoOffsetReset = “earliest”
            };
     
            string topic = “topic_name”;
     
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(topic);
     
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };
     
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • BootstrapServers:MQS连接地址和端口。
    • GroupId:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
    • topic:要消费消息的Topic名称。