Java Example Code
Description
This section describes how to use Kafka to send data to IoTDB.
Sample Code
- Producer.java:
This example shows how to send time series data to a Kafka cluster.
- Change the value of the public final static String TOPIC variable in the KafkaProperties.java file based on the site requirements, for example, kafka-topic.
- The default time series data format of this sample is Device name,Timestamp,Value, for example, sensor_1,1642215835758,1.0. You can change the value of IOTDB_DATA_SAMPLE_TEMPLATE in the Constant.java file based on the site requirements.
public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = true; if (isSecurityModel) { try { LOG.info("Security mode start."); // Note: During security authentication, you need to manually change the account to a machine-machine one that you have applied for. LoginUtil.securityPrepare(Constant.USER_PRINCIPAL, Constant.USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure.", e); return; } LOG.info("Security prepare success."); } // whether to use the asynchronous sending mode final boolean asyncEnable = false; Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable); }
For details about the Kafka producer code, see Producer API Sample.
- KafkaConsumerMultThread.java:
This example shows how to write data from a Kafka cluster to IoTDB using multiple threads. Kafka cluster data is generated by Producer.java.
- Change the value of the public final static String TOPIC variable in the KafkaProperties.java file based on the site requirements, for example, kafka-topic.
- Change the value of CONCURRENCY_THREAD_NUM in the KafkaConsumerMultThread.java file to adjust the number of consumer threads.
- If multiple threads are used to consume Kafka cluster data, ensure that the number of consumed topic partitions is greater than 1.
- The Kafka client configuration files client.properties and log4j.properties must be stored in the configuration file directory of the program.
- Set the IP address, port number, username, and password of the node where the IoTDBServer is located in the IoTDBSessionPool object parameters.
- On FusionInsight Manager, choose Cluster > Services > IoTDB and click the Instance tab to view the IP address of the node where the IoTDBServer to be connected is located.
- To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configuration, and click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
- In security mode, the username and password for logging in to the node where IoTDBServer resides are controlled by FusionInsight Manager. Ensure that the user has the permission to operate the IoTDB service. For details, see Preparing for User Authentication.
- You need to set the username and password for authentication in the local environment variables. You are advised to store the username and password in ciphertext and decrypt them upon using.
- Authentication username is the username for accessing IoTDB.
- Password is the password for accessing IoTDB.
/** * In security mode, the default value of SSL_ENABLE is true. You need to import the truststore.jks file. * In security mode, you can also log in to FusionInsight Manager, choose Cluster > Services > IoTDB > Configuration, search for SSL in the search box, and change the value of SSL_ENABLE to false. After saving the configuration, restart the IoTDB service for the configuration to take effect. Modify the following configuration in the iotdb-client.env file in the Client installation directory/IoTDB/iotdb/conf directory on the client: iotdb_ssl_enable="false" */ private static final String IOTDB_SSL_ENABLE = "true"; // Set it to the SSL_ENABLE value. public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = true; if (isSecurityModel) { try { LOG.info("Securitymode start."); // Note: During security authentication, you need to manually change the account to a machine-machine one. LoginUtil.securityPrepare(Constant.USER_PRINCIPAL, Constant.USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure.", e); return; } LOG.info("Security prepare success."); } // set iotdb_ssl_enable System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE); if ("true".equals(IOTDB_SSL_ENABLE)) { // set truststore.jks path System.setProperty("iotdb_ssl_truststore", "truststore file path"); } // create IoTDB session connection pool IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "Authentication username", "Password", 3); // start consumer thread KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool); consumerThread.start(); } /** * consumer thread */ private class ConsumerThread extends ShutdownableThread { private int threadNum; private String topic; private KafkaConsumer<String, String> consumer; private IoTDBSessionPool sessionPool; public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.sessionPool = sessionPool; this.consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(this.topic)); } public void doWork() { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime)); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); // insert kafka consumer data to iotdb sessionPool.insertRecord(record.value()); } } }
For details about the Kafka consumer code, see Multi-thread Producer Sample.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot