Updated on 2023-11-17 GMT+08:00

Configuring a Client in C#

Scenarios

This section describes how to connect a C# Kafka client to MQS (including Kafka client installation), and how to produce and consume messages.

Prerequisites

  • You have obtained MQS connection information. For details, see Preparations.
  • You have installed the development tool and C# development environment. For details, see Preparations.

Installing the Kafka Client

MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the C# open-source client, see suggested client versions.

Run the following command to install the C# Kafka dependency libraries:

dotnet add package -v 1.5.2 Confluent.Kafka

Producing Messages

  • SASL authentication mode

    Replace the information in bold with the actual values.

    using System;
    using Confluent.Kafka;
     
    class Producer
    {
        public static void Main(string[] args)
        {
            var conf = new ProducerConfig {
                bootstrap_servers = "ip1:port1,ip2:port2,ip3:port3",
                context.load_verify_locations = "phy_ca.crt",
                sasl_mechanism = "PLAIN",
                security_protocol= "SASL_SSL",
                SaslUsername = "username",
                SaslPassword = "password",
                };
     
            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");
     
            string topic = "topic_name";
     
            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                for (int i=0; i<100; ++i)
                {
                    p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
                }
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap_servers: MQS connection addresses and ports
    • topics: names of the topics that produce messages
    • SaslUsername and SaslPassword: username and password used for SASL_SSL authentication
    • context.load_verify_locations: client certificate used for SASL_SSL authentication
  • Non-SASL authentication mode

    Replace the information in bold with the actual values.

    using System;
    using Confluent.Kafka;
     
    class Producer
    {
        public static void Main(string[] args)
        {
            var conf = new ProducerConfig {
                bootstrap_servers = "ip1:port1,ip2:port2,ip3:port3",
                };
     
            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");
     
            string topic = "topic_name";
     
            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                for (int i=0; i<100; ++i)
                {
                    p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
                }
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap_servers: MQS connection addresses and ports
    • topics: names of the topics that produce messages

Consuming Messages

  • SASL authentication mode

    Replace the information in bold with the actual values.

    using System;
    using System.Threading;
    using Confluent.Kafka;
     
    class Consumer
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig {
                GroupId = "group_id",
                BootstrapServers = "ip1:port1,ip2:port2,ip3:port3",
                SslCaLocation = "phy_ca.crt", 
                SaslMechanism = "PLAIN",
                SecurityProtocol = SASL_SSL,
                SaslUsername = "username",
                SaslPassword = "password",
                AutoOffsetReset = "earliest"
            };
     
            string topic = "topic_name";
     
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(topic);
     
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };
     
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • BootstrapServers: MQS connection addresses and ports
    • GroupId: consumer group name If the specified consumer group does not exist, the system automatically creates one.
    • topics: names of the topics that consume messages
    • SaslUsername and SaslPassword: username and password used for SASL_SSL authentication
    • SslCaLocation: client certificate used for SASL_SSL authentication
  • Non-SASL authentication mode

    Replace the information in bold with the actual values.

    using System;
    using System.Threading;
    using Confluent.Kafka;
     
    class Consumer
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig {
                GroupId = "group_id",
                BootstrapServers = "ip1:port1,ip2:port2,ip3:port3",
                AutoOffsetReset = "earliest"
            };
     
            string topic = "topic_name";
     
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(topic);
     
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };
     
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • BootstrapServers: MQS connection addresses and ports
    • GroupId: consumer group name If the specified consumer group does not exist, the system automatically creates one.
    • topics: names of the topics that consume messages