更新时间:2024-12-06 GMT+08:00

FlinkIoTDBSource样例程序

功能简介

IoTDB与Flink的集成。此模块包含了iotdb source,通过flink job将时序数据从IoTDB读取出来并且打印。

代码样例

该示例演示了Flink job 如何从IoTDB server读取时序数据的场景:

  • Flink使用IoTDBSource从 IoTDB server读取数据。
  • 要使用IoTDBSource,您需要构造一个IoTDBSource的实例,通过指定的IoTDBSourceOptions并在IoTDBSource中实现抽象方法convert(),convert()定义了您希望如何转换行数据。

其中在Session对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。

  • 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”查看。
  • RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
  • 普通模式下,初始安装后IoTDB有一个默认用户:root,密码请参见用户账号一览表章节获取。该用户为管理员用户,固定拥有所有权限,无法被赋予,无法被撤销权限,也无法被删除。
  • 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
    • 认证用户名为访问IoTDB的用户名。
    • 认证用户密码为访问IoTDB的用户密码。
public class FlinkIoTDBSource {
  private static final String IOTDB_SSL_ENABLE = "true";//该值可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,查看“SSL_ENABLE”参数值获取。
  static final String LOCAL_HOST = "127.0.0.1";
  static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
  static final String ROOT_SG1_D1 = "root.sg1.d1";

  public static void main(String[] args) throws Exception {
    // use session api to create data in IoTDB
    prepareData();

    // run the flink job on local mini cluster
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    IoTDBSourceOptions ioTDBSourceOptions =
        new IoTDBSourceOptions(
            LOCAL_HOST, 22260, "认证用户名", "认证用户密码", "select s1 from " + ROOT_SG1_D1 + " align by device");

    IoTDBSource<RowRecord> source =
        new IoTDBSource<RowRecord>(ioTDBSourceOptions) {
          @Override
          public RowRecord convert(RowRecord rowRecord) {
            return rowRecord;
          }
        };
    env.addSource(source).name("sensor-source").print().setParallelism(2);
    env.execute();
  }

  private static void prepareData()
      throws IoTDBConnectionException, StatementExecutionException, TTransportException {
    // set iotdb_ssl_enable
    System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
    if ("true".equals(IOTDB_SSL_ENABLE)) {  
      // set truststore.jks path  
      System.setProperty("iotdb_ssl_truststore", "truststore文件路径");
    }
    Session session = new Session(LOCAL_HOST, 22260, "认证用户名", "认证用户密码");
    session.open(false);
    try {
      session.setStorageGroup("root.sg1");
      if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
        session.createTimeseries(
            ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
        List<String> measurements = new ArrayList<>();
        measurements.add("s1");
        measurements.add("s2");
        measurements.add("s3");
        List<TSDataType> types = new ArrayList<>();
        types.add(TSDataType.INT64);
        types.add(TSDataType.INT64);
        types.add(TSDataType.INT64);

        for (long time = 0; time < 1000; time++) {
          List<Object> values = new ArrayList<>();
          values.add(1L);
          values.add(2L);
          values.add(3L);
          session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
        }
      }
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }
  }
}