Updated on 2024-04-02 GMT+08:00

Java Sample Code

Description

This example shows how to send data to IoTDB using Kafka.

Sample Code

  • Producer.java:

    This example shows how to send time series data to a Kafka cluster.

    1. Change the value of the public final static String TOPIC variable in the KafkaProperties.java file based on the site requirements, for example, kafka-topic.
    2. The default time series data format of this sample is Device name,Timestamp,Value, for example, sensor_1,1642215835758,1.0. You can change the value of IOTDB_DATA_SAMPLE_TEMPLATE in the Constant.java file based on the site requirements.
    public static void main(String[] args) {
        // whether use security mode
        final boolean isSecurityModel = false;
        ...
        // whether to use the asynchronous sending mode
        final boolean asyncEnable = false;
        Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable);
    }

    For details about the Kafka producer code, see Producer API Usage Sample.

  • KafkaConsumerMultThread.java:

    This example shows how to write data from a Kafka cluster to IoTDB using multiple threads. Kafka cluster data is generated by Producer.java.

    1. Change the value of the public final static String TOPIC variable in the KafkaProperties.java file based on the site requirements, for example, kafka-topic.
    2. Change the value of CONCURRENCY_THREAD_NUM in the KafkaConsumerMultThread.java file to adjust the number of consumer threads.

      If multiple threads are used to consume Kafka cluster data, ensure that the number of consumed topic partitions is greater than 1.

    3. Set the IP address, port number, username, and password of the node where the IoTDBServer is located in the IoTDBSessionPool object parameters.
      • On FusionInsight Manager, choose Cluster > Services > IoTDB and click the Instance tab to view the IP address of the node where the IoTDBServer to be connected is located.
      • To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB. Click Configuration, click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
      private static final String IOTDB_SSL_ENABLE = "true";//To obtain the value, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configurations, search for SSL in the search box, and view the value of SSL_ENABLE.
    public static void main(String[] args) {
        // whether use security mode
        final boolean isSecurityModel = false;
        ...
    
    // set iotdb_ssl_enable
        System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
        if ("true".equals(IOTDB_SSL_ENABLE)) {  
          // set truststore.jks path  
          System.setProperty("iotdb_ssl_truststore", "truststore file path");
        }
        // create IoTDB session connection pool
        IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "root", "Password", 3);
    
        // start consumer thread
        KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool);
        consumerThread.start();
    }
    
        /**
         * consumer thread
         */
        private class ConsumerThread extends ShutdownableThread {
            private int threadNum;
            private String topic;
            private KafkaConsumer<String, String> consumer;
            private IoTDBSessionPool sessionPool;
    
            public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) {
                super("ConsumerThread" + threadNum, true);
                this.threadNum = threadNum;
                this.topic = topic;
                this.sessionPool = sessionPool;
                this.consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Collections.singleton(this.topic));
            }
    
            public void doWork() {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime));
                for (ConsumerRecord<String, String> record : records) {
                    LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: "
                            + record.value() + " offsets: " + record.offset());
    
                    // insert kafka consumer data to iotdb
                    sessionPool.insertRecord(record.value());
                }
            }
        }

    For details about the Kafka consumer code, see Multi-thread Producer Sample.