Java Sample Code
Description
This example shows how to send data to IoTDB using Kafka.
Sample Code
- Producer.java:
This example shows how to send time series data to a Kafka cluster.
- Change the value of the public final static String TOPIC variable in the KafkaProperties.java file based on the site requirements, for example, kafka-topic.
- The default time series data format of this sample is Device name,Timestamp,Value, for example, sensor_1,1642215835758,1.0. You can change the value of IOTDB_DATA_SAMPLE_TEMPLATE in the Constant.java file based on the site requirements.
public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = false; ... // whether to use the asynchronous sending mode final boolean asyncEnable = false; Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable); }
For details about the Kafka producer code, see Producer API Usage Sample.
- KafkaConsumerMultThread.java:
This example shows how to write data from a Kafka cluster to IoTDB using multiple threads. Kafka cluster data is generated by Producer.java.
- Change the value of the public final static String TOPIC variable in the KafkaProperties.java file based on the site requirements, for example, kafka-topic.
- Change the value of CONCURRENCY_THREAD_NUM in the KafkaConsumerMultThread.java file to adjust the number of consumer threads.
- If multiple threads are used to consume Kafka cluster data, ensure that the number of consumed topic partitions is greater than 1.
- The Kafka client configuration files client.properties and log4j.properties must be stored in the configuration file directory of the program.
- Set the IP address, port number, username, and password of the node where the IoTDBServer is located in the IoTDBSessionPool object parameters.
- On FusionInsight Manager, choose Cluster > Services > IoTDB and click the Instance tab to view the IP address of the node where the IoTDBServer to be connected is located.
- To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB. Click Configuration, click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
- You need to set the username and password for authentication in the local environment variables. You are advised to store the username and password in ciphertext and decrypt them upon using.
- Authentication username is the username for accessing IoTDB.
- Password is the password for accessing IoTDB.
private static final String IOTDB_SSL_ENABLE = "true";//To obtain the value, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configurations, search for SSL in the search box, and view the value of SSL_ENABLE. public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = false; ... // set iotdb_ssl_enable System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE); if ("true".equals(IOTDB_SSL_ENABLE)) { // set truststore.jks path System.setProperty("iotdb_ssl_truststore", "truststore file path"); } // create IoTDB session connection pool IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "Authentication username", "Password", 3); // start consumer thread KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool); consumerThread.start(); } /** * consumer thread */ private class ConsumerThread extends ShutdownableThread { private int threadNum; private String topic; private KafkaConsumer<String, String> consumer; private IoTDBSessionPool sessionPool; public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.sessionPool = sessionPool; this.consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(this.topic)); } public void doWork() { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime)); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); // insert kafka consumer data to iotdb sessionPool.insertRecord(record.value()); } } }
For details about the Kafka consumer code, see Multi-thread Producer Sample.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot