更新时间:2024-06-05 GMT+08:00
IoTDB Kafka样例程序
功能简介
该样例介绍如何通过Kafka将数据发送到IoTDB。
代码样例
- Producer.java:
- 根据实际场景,修改“KafkaProperties.java”文件中的“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
- 该样例默认的时序数据模板为“设备名称,时间戳,值”,例如“sensor_1,1642215835758,1.0”,可根据实际场景在“Constant.java”文件修改“IOTDB_DATA_SAMPLE_TEMPLATE”值。
public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = true; if (isSecurityModel) { try { LOG.info("Security mode start."); // 注意,安全认证时,需要用户手动修改为自己申请的机机账号 LoginUtil.securityPrepare(Constant.USER_PRINCIPAL, Constant.USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure.", e); return; } LOG.info("Security prepare success."); } // whether to use the asynchronous sending mode final boolean asyncEnable = false; Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable); }
Kafka生产者代码可参考使用Producer API向安全Topic生产消息。
- KafkaConsumerMultThread.java:
该样例展示如何通过多线程将数据从Kafka集群写入到IoTDB,Kafka集群数据由“Producer.java”产生。
- 根据实际场景,在“KafkaProperties.java”文件中修改“TOPIC”变量,例如:public final static String TOPIC = "kafka-topic"。
- 可以通过修改“KafkaConsumerMultThread.java”文件中的“CONCURRENCY_THREAD_NUM”参数值调整消费者线程数量。
- 如果要使用多线程消费Kafka集群数据的话,请确保消费的主题分区数大于1。
- 需要将Kafka客户端配置文件“client.properties”和“log4j.properties”文件放置在程序运行的配置文件目录下。
- 在IoTDBSessionPool对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
- 在FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”,查看待连接的IoTDBServer所在的节点的业务IP。
- RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
- 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务的角色权限。
- 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
- 认证用户名为访问IoTDB的用户名。
- 认证用户密码为访问IoTDB的用户密码。
/** * 安全模式下,“SSL_ENABLE”默认为“true”,需要导入truststore.jks文件。 * 安全模式下,也可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,修改“SSL_ENABLE”参数值为“false”;保存配置后需重启IoTDB服务使配置生效。并修改客户端“客户端安装目录/IoTDB/iotdb/conf”目录下的“iotdb-client.env”文件中的配置:iotdb_ssl_enable="false"。 */ private static final String IOTDB_SSL_ENABLE = "true"; //该值为“SSL_ENABLE”参数值。 public static void main(String[] args) { // whether use security mode final boolean isSecurityModel = true; if (isSecurityModel) { try { LOG.info("Securitymode start."); // 注意,安全认证时,需要用户手动修改为自己申请的机机账号 LoginUtil.securityPrepare(Constant.USER_PRINCIPAL, Constant.USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure.", e); return; } LOG.info("Security prepare success."); } // set iotdb_ssl_enable System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE); if ("true".equals(IOTDB_SSL_ENABLE)) { // set truststore.jks path System.setProperty("iotdb_ssl_truststore", "truststore文件路径"); } // create IoTDB session connection pool IoTDBSessionPool sessionPool = new IoTDBSessionPool("127.0.0.1", 22260, "认证用户名", "认证用户密码", 3); // start consumer thread KafkaConsumerMultThread consumerThread = new KafkaConsumerMultThread(KafkaProperties.TOPIC, sessionPool); consumerThread.start(); } /** * consumer thread */ private class ConsumerThread extends ShutdownableThread { private int threadNum; private String topic; private KafkaConsumer<String, String> consumer; private IoTDBSessionPool sessionPool; public ConsumerThread(int threadNum, String topic, Properties props, IoTDBSessionPool sessionPool) { super("ConsumerThread" + threadNum, true); this.threadNum = threadNum; this.topic = topic; this.sessionPool = sessionPool; this.consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(this.topic)); } public void doWork() { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(waitTime)); for (ConsumerRecord<String, String> record : records) { LOG.info("Consumer Thread-" + this.threadNum + " partitions:" + record.partition() + " record: " + record.value() + " offsets: " + record.offset()); // insert kafka consumer data to iotdb sessionPool.insertRecord(record.value()); } } }
Kafka消费者代码可参考使用多线程Producer发送消息。
父主题: 开发IoTDB应用