更新时间:2024-06-05 GMT+08:00
IoTDB Kafka样例程序
功能简介
该样例介绍如何通过Kafka将数据发送到IoTDB。
代码样例
- Producer.java:
- 根据实际场景,修改“KafkaProperties.java”文件中的“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
- 该样例默认的时序数据模板为“设备名称,时间戳,值”,例如“sensor_1,1642215835758,1.0”,可根据实际场景在“Constant.java”文件修改“IOTDB_DATA_SAMPLE_TEMPLATE”值。
public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = false; ... // whether to use the asynchronous sending mode final boolean asyncEnable = false; Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable); }
Kafka生产者代码可参考使用Producer API向安全Topic生产消息。
- KafkaConsumerMultThread.java:
该样例展示如何通过多线程将数据从Kafka集群写入到IoTDB,Kafka集群数据由Producer.java产生。
- 根据实际场景,在“KafkaProperties.java”文件中修改“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
- 可以通过修改“KafkaConsumerMultThread.java”文件中的“CONCURRENCY_THREAD_NUM”参数值调整消费者线程数量。
- 如果要使用多线程消费Kafka集群数据的话,请确保消费的主题分区数大于1。
- 需要将Kafka客户端配置文件“client.properties”和“log4j.properties”文件放置在程序运行的配置文件目录下。
- 在IoTDBSessionPool对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
- 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manage界面,选择“集群 > 服务 > IoTDB > 实例”查看。
- RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
- 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
- 认证用户名为访问IoTDB的用户名。
- 认证用户密码为访问IoTDB的用户密码。
private static final String IOTDB_SSL_ENABLE = "true";//该值可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,查看“SSL_ENABLE”参数值获取。 public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = false; ... // set iotdb_ssl_enable System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE); if ("true".equals(IOTDB_SSL_ENABLE)) { // set truststore.jks path System.setProperty("iotdb_ssl_truststore", "truststore文件路径"); } // create IoTDB seesion connection pool IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "认证用户名", "认证用户密码", 3); // start consumer thread KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool); consumerThread.start(); } /** * consumer thread */ private class ConsumerThread extends ShutdownableThread { private int threadNum; private String topic; private KafkaConsumer<String, String> consumer; private IoTDBSessionPool sessionPool; public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.sessionPool = sessionPool; this.consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(this.topic)); } public void doWork() { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime)); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); // insert kafka consumer data to iotdb sessionPool.insertRecord(record.value()); } } }
Kafka消费者代码可参考使用多线程Producer发送消息。
父主题: 开发IoTDB应用