Updated on 2024-08-10 GMT+08:00

FlinkIoTDBSource

Description

It is an integration of IoTDB and Flink. This module contains IoTDBSource and reads time series data from IoTDB and prints the data using a Flink job.

Sample Code

This example shows how a Flink job reads time series data from a IoTDB server.

  • Flink uses IoTDBSource to read data from a IoTDB server.
  • To use IoTDBSource, you need to construct an IoTDBSource instance by specifying IoTDBSourceOptions and implementing the abstract method convert() in IoTDBSource. The convert() method defines how you want to convert row data.

Session object parameters include the IP address, port number, username, and password of the node where the IoTDBServer is located.

  • On FusionInsight Manager, choose Cluster > Services > IoTDB > Instance to view the IP address of the node where the IoTDBServer to be connected is located.
  • To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB. Click Configuration, click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
  • In normal mode, IoTDB has a default user root after initial installation. To obtain the password, see User Account List. This user is an administrator and has all permissions, which cannot be assigned, revoked, or deleted.
  • You need to set the username and password for authentication in the local environment variables. You are advised to store the username and password in ciphertext and decrypt them upon using.
    • Authentication username is the username for accessing IoTDB.
    • Password is the password for accessing IoTDB.
public class FlinkIoTDBSource {
  private static final String IOTDB_SSL_ENABLE = "true";//To obtain the value, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configurations, search for SSL in the search box, and view the value of SSL_ENABLE.
  static final String LOCAL_HOST = "127.0.0.1";
  static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
  static final String ROOT_SG1_D1 = "root.sg1.d1";

  public static void main(String[] args) throws Exception {
    // use session api to create data in IoTDB
    prepareData();

    // run the flink job on local mini cluster
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    IoTDBSourceOptions ioTDBSourceOptions =
        new IoTDBSourceOptions(
            LOCAL_HOST, 22260, "Authentication username", "Password", "select s1 from "+ ROOT_SG1_D1 +" align by device");

    IoTDBSource<RowRecord> source =
        new IoTDBSource<RowRecord>(ioTDBSourceOptions) {
          @Override
          public RowRecord convert(RowRecord rowRecord) {
            return rowRecord;
          }
        };
    env.addSource(source).name("sensor-source").print().setParallelism(2);
    env.execute();
  }

  private static void prepareData()
      throws IoTDBConnectionException, StatementExecutionException, TTransportException {
    // set iotdb_ssl_enable
    System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
    if ("true".equals(IOTDB_SSL_ENABLE)) {  
      // set truststore.jks path  
      System.setProperty("iotdb_ssl_truststore", "truststore file path");
    }
    Session session = new Session(LOCAL_HOST, 22260, "Authentication username", "Password");
    session.open(false);
    try {
      session.setStorageGroup("root.sg1");
      if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
        session.createTimeseries(
            ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
        List<String> measurements = new ArrayList<>();
        measurements.add("s1");
        measurements.add("s2");
        measurements.add("s3");
        List<TSDataType> types = new ArrayList<>();
        types.add(TSDataType.INT64);
        types.add(TSDataType.INT64);
        types.add(TSDataType.INT64);

        for (long time = 0; time < 1000; time++) {
          List<Object> values = new ArrayList<>();
          values.add(1L);
          values.add(2L);
          values.add(3L);
          session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
        }
      }
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }
  }
}