Configuring a Client in C#
Scenarios
This section describes how to connect a C# Kafka client to MQS (including Kafka client installation), and how to produce and consume messages.
Prerequisites
- You have obtained MQS connection information. For details, see Preparations.
- You have installed the development tool and C# development environment. For details, see Preparations.
Installing the Kafka Client
MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the C# open-source client, see suggested client versions.
Run the following command to install the C# Kafka dependency libraries:
dotnet add package -v 1.5.2 Confluent.Kafka
Producing Messages
- SASL authentication mode
Replace the information in bold with the actual values.
using System; using Confluent.Kafka; class Producer { public static void Main(string[] args) { var conf = new ProducerConfig { bootstrap_servers = "ip1:port1,ip2:port2,ip3:port3", context.load_verify_locations = "phy_ca.crt", sasl_mechanism = "PLAIN", security_protocol= "SASL_SSL", SaslUsername = "username", SaslPassword = "password", }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); string topic = "topic_name"; using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i=0; i<100; ++i) { p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } } }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- bootstrap_servers: MQS connection addresses and ports
- topics: names of the topics that produce messages
- SaslUsername and SaslPassword: username and password used for SASL_SSL authentication
- context.load_verify_locations: client certificate used for SASL_SSL authentication
- Non-SASL authentication mode
Replace the information in bold with the actual values.
using System; using Confluent.Kafka; class Producer { public static void Main(string[] args) { var conf = new ProducerConfig { bootstrap_servers = "ip1:port1,ip2:port2,ip3:port3", }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); string topic = "topic_name"; using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i=0; i<100; ++i) { p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } } }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- bootstrap_servers: MQS connection addresses and ports
- topics: names of the topics that produce messages
Consuming Messages
- SASL authentication mode
Replace the information in bold with the actual values.
using System; using System.Threading; using Confluent.Kafka; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "group_id", BootstrapServers = "ip1:port1,ip2:port2,ip3:port3", SslCaLocation = "phy_ca.crt", SaslMechanism = "PLAIN", SecurityProtocol = SASL_SSL, SaslUsername = "username", SaslPassword = "password", AutoOffsetReset = "earliest" }; string topic = "topic_name"; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } } }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- BootstrapServers: MQS connection addresses and ports
- GroupId: consumer group name If the specified consumer group does not exist, the system automatically creates one.
- topics: names of the topics that consume messages
- SaslUsername and SaslPassword: username and password used for SASL_SSL authentication
- SslCaLocation: client certificate used for SASL_SSL authentication
- Non-SASL authentication mode
Replace the information in bold with the actual values.
using System; using System.Threading; using Confluent.Kafka; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "group_id", BootstrapServers = "ip1:port1,ip2:port2,ip3:port3", AutoOffsetReset = "earliest" }; string topic = "topic_name"; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } } }
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- BootstrapServers: MQS connection addresses and ports
- GroupId: consumer group name If the specified consumer group does not exist, the system automatically creates one.
- topics: names of the topics that consume messages
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot