更新时间:2024-12-06 GMT+08:00

IoTDB Kafka样例程序

功能简介

该样例介绍如何通过Kafka将数据发送到IoTDB。

代码样例

  • Producer.java:

    该样例展示如何将时序数据发送到Kafka集群。

    1. 根据实际场景,修改“KafkaProperties.java”文件中的“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
    2. 该样例默认的时序数据模板为“设备名称,时间戳,值”,例如“sensor_1,1642215835758,1.0”,可根据实际场景在“Constant.java”文件修改“IOTDB_DATA_SAMPLE_TEMPLATE”值。
    public static void main(String[] args) {
        // whether use security mode
        final boolean isSecurityModel = false;
        ...
        // whether to use the asynchronous sending mode
        final boolean asyncEnable = false;
        Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable);
    }

    Kafka生产者代码可参考使用Producer API向安全Topic生产消息

  • KafkaConsumerMultThread.java:

    该样例展示如何通过多线程将数据从Kafka集群写入到IoTDB,Kafka集群数据由Producer.java产生。

    1. 根据实际场景,在“KafkaProperties.java”文件中修改“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
    2. 可以通过修改“KafkaConsumerMultThread.java”文件中的“CONCURRENCY_THREAD_NUM”参数值调整消费者线程数量。
      • 如果要使用多线程消费Kafka集群数据的话,请确保消费的主题分区数大于1。
      • 需要将Kafka客户端配置文件“client.properties”和“log4j.properties”文件放置在程序运行的配置文件目录下。
    3. 在IoTDBSessionPool对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
      • 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”查看。
      • RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
      • 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
        • 认证用户名为访问IoTDB的用户名。
        • 认证用户密码为访问IoTDB的用户密码。
    private static final String IOTDB_SSL_ENABLE = "true";//该值可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,查看“SSL_ENABLE”参数值获取。
    public static void main(String[] args) {
        // whether use security mode
        final boolean isSecurityModel = false;
        ...
    
    // set iotdb_ssl_enable
        System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
        if ("true".equals(IOTDB_SSL_ENABLE)) {  
          // set truststore.jks path  
          System.setProperty("iotdb_ssl_truststore", "truststore文件路径");
        }
        // create IoTDB seesion connection pool
        IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "认证用户名", "认证用户密码", 3);
    
        // start consumer thread
        KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool);
        consumerThread.start();
    }
    
        /**
         * consumer thread
         */
        private class ConsumerThread extends ShutdownableThread {
            private int threadNum;
            private String topic;
            private KafkaConsumer<String, String> consumer;
            private IoTDBSessionPool sessionPool;
    
            public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) {
                super("ConsumerThread" + threadNum, true);
                this.threadNum = threadNum;
                this.topic = topic;
                this.sessionPool = sessionPool;
                this.consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Collections.singleton(this.topic));
            }
    
            public void doWork() {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime));
                for (ConsumerRecord<String, String> record : records) {
                    LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: "
                            + record.value() + " offsets: " + record.offset());
    
                    // insert kafka consumer data to iotdb
                    sessionPool.insertRecord(record.value());
                }
            }
        }

    Kafka消费者代码可参考使用多线程Producer发送消息