更新时间:2024-12-06 GMT+08:00

IoTDB Kafka样例程序

功能简介

该样例介绍如何通过Kafka将数据发送到IoTDB。

代码样例

  • Producer.java:

    该样例展示如何将时序数据发送到Kafka集群。

    1. 根据实际场景,修改“KafkaProperties.java”文件中的“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
    2. 该样例默认的时序数据模板为“设备名称,时间戳,值”,例如“sensor_1,1642215835758,1.0”,可根据实际场景在“Constant.java”文件修改“IOTDB_DATA_SAMPLE_TEMPLATE”值。
    public static void main(String[] args) {
        // whether use security mode
        final boolean isSecurityModel = true;
    
        if (isSecurityModel) {
            try {
                LOG.info("Security mode start.");
    
                // 注意,安全认证时,需要用户手动修改为自己申请的机机账号
                LoginUtil.securityPrepare(Constant.USER_PRINCIPAL, Constant.USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }
    
        // whether to use the asynchronous sending mode
        final boolean asyncEnable = false;
        Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable);
    }

    Kafka生产者代码可参考使用Producer API向安全Topic生产消息

  • KafkaConsumerMultThread.java:

    该样例展示如何通过多线程将数据从Kafka集群写入到IoTDB,Kafka集群数据由“Producer.java”产生。

    1. 根据实际场景,在“KafkaProperties.java”文件中修改“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
    2. 可以通过修改“KafkaConsumerMultThread.java”文件中的“CONCURRENCY_THREAD_NUM”参数值调整消费者线程数量。
      • 如果要使用多线程消费Kafka集群数据的话,请确保消费的主题分区数大于1。
      • 需要将Kafka客户端配置文件“client.properties”和“log4j.properties”文件放置在程序运行的配置文件目录下。
    3. 在IoTDBSessionPool对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
      • 在FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”,查看待连接的IoTDBServer所在的节点的业务IP。
      • RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
      • 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务的角色权限。
      • 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
        • 认证用户名为访问IoTDB的用户名。
        • 认证用户密码为访问IoTDB的用户密码。
    /**
     * 安全模式下,“SSL_ENABLE”默认为“true”,需要导入truststore.jks文件。
     * 安全模式下,也可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,修改“SSL_ENABLE”参数值为“false”;保存配置后需重启IoTDB服务使配置生效。并修改客户端“客户端安装目录/IoTDB/iotdb/conf”目录下的“iotdb-client.env”文件中的配置:iotdb_ssl_enable="false"。
     */
        private static final String IOTDB_SSL_ENABLE = "true"; //该值为“SSL_ENABLE”参数值。
    public static void main(String[] args) {
        // whether use security mode
        final boolean isSecurityModel = true;
    
        if (isSecurityModel) {
            try {
                LOG.info("Securitymode start.");
    
                // 注意,安全认证时,需要用户手动修改为自己申请的机机账号
                LoginUtil.securityPrepare(Constant.USER_PRINCIPAL, Constant.USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }
        // set iotdb_ssl_enable
        System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
        if ("true".equals(IOTDB_SSL_ENABLE)) {  
          // set truststore.jks path  
          System.setProperty("iotdb_ssl_truststore", "truststore文件路径");
        }
    
        // create IoTDB session connection pool
        IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "认证用户名", "认证用户密码", 3);
    
        // start consumer thread
        KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool);
        consumerThread.start();
    }
    
        /**
         * consumer thread
         */
        private class ConsumerThread extends ShutdownableThread {
            private int threadNum;
            private String topic;
            private KafkaConsumer<String, String> consumer;
            private IoTDBSessionPool sessionPool;
    
            public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) {
                super("ConsumerThread" + threadNum, true);
                this.threadNum = threadNum;
                this.topic = topic;
                this.sessionPool = sessionPool;
                this.consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Collections.singleton(this.topic));
            }
    
            public void doWork() {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime));
                for (ConsumerRecord<String, String> record : records) {
                    LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: "
                            + record.value() + " offsets: " + record.offset());
    
                    // insert kafka consumer data to iotdb
                    sessionPool.insertRecord(record.value());
                }
            }
        }

    Kafka消费者代码可参考使用多线程Producer发送消息