Updated on 2024-04-02 GMT+08:00

FlinkIoTDBSource

Description

It is an integration of IoTDB and Flink. This module contains IoTDBSource and reads time series data from IoTDB and prints the data using a Flink job.

Sample Code

This example shows how a Flink job reads time series data from a IoTDB server.

  • Flink uses IoTDBSource to read data from a IoTDB server.
  • To use IoTDBSource, you need to construct an IoTDBSource instance by specifying IoTDBSourceOptions and implementing the abstract method convert() in IoTDBSource. The convert() method defines how you want to convert row data.

Session object parameters include the IP address, port number, username, and password of the node where the IoTDBServer is located.

  • On FusionInsight Manager, choose Cluster > Services > IoTDB > Instance to view the IP address of the node where the IoTDBServer to be connected is located.
  • To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configuration, and click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
  • In security mode, the username and password for logging in to the node where IoTDBServer resides are controlled by FusionInsight Manager. Ensure that the user has the permissions to operate the IoTDB and Flink services. For details, see Preparing for User Authentication.
  • You need to set the username and password for authentication in the local environment variables. You are advised to store the username and password in ciphertext and decrypt them upon using.
    • Authentication username is the username for accessing IoTDB.
    • Password is the password for accessing IoTDB.
public class FlinkIoTDBSource {
/**
 * In security mode, the default value of SSL_ENABLE is true. You need to import the truststore.jks file.
 * In security mode, you can also log in to FusionInsight Manager, choose Cluster > Services > IoTDB > Configuration, search for SSL in the search box, and change the value of SSL_ENABLE to false. After saving the configuration, restart the IoTDB service for the configuration to take effect. Modify the following configuration in the iotdb-client.env file in the Client installation directory/IoTDB/iotdb/conf directory on the client: iotdb_ssl_enable="false"
 */
  private static final String IOTDB_SSL_ENABLE = "true"; // Set it to the SSL_ENABLE value.
  static final String LOCAL_HOST = "127.0.0.1";
  static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
  static final String ROOT_SG1_D1 = "root.sg1.d1";

  public static void main(String[] args) throws Exception {
    // use session api to create data in IoTDB
    prepareData();

    // run the flink job on local mini cluster
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    IoTDBSourceOptions ioTDBSourceOptions =
        new IoTDBSourceOptions(
            LOCAL_HOST, 22260, "Authentication username", "Password", "select s1 from " + ROOT_SG1_D1 + " align by device");

    IoTDBSource<RowRecord> source =
        new IoTDBSource<RowRecord>(ioTDBSourceOptions) {
          @Override
          public RowRecord convert(RowRecord rowRecord) {
            return rowRecord;
          }
        };
    env.addSource(source).name("sensor-source").print().setParallelism(2);
    env.execute();
  }

  private static void prepareData()
      throws IoTDBConnectionException, StatementExecutionException, TTransportException {
    // set iotdb_ssl_enable
    System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
    if ("true".equals(IOTDB_SSL_ENABLE)) {  
      // set truststore.jks path  
      System.setProperty("iotdb_ssl_truststore", "truststore file path");
    }
    Session session = new Session(LOCAL_HOST, 22260, "Authentication username", "Password");
    session.open(false);
    try {
      session.setStorageGroup("root.sg1");
      if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
        session.createTimeseries(
            ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
        List<String> measurements = new ArrayList<>();
        measurements.add("s1");
        measurements.add("s2");
        measurements.add("s3");
        List<TSDataType> types = new ArrayList<>();
        types.add(TSDataType.INT64);
        types.add(TSDataType.INT64);
        types.add(TSDataType.INT64);

        for (long time = 0; time < 1000; time++) {
          List<Object> values = new ArrayList<>();
          values.add(1L);
          values.add(2L);
          values.add(3L);
          session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
        }
      }
    } catch (StatementExecutionException e) {
      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
        throw e;
      }
    }
  }
}