Updated on 2024-04-02 GMT+08:00

FlinkIoTDBSink

Description

It is an integration of IoTDB and Flink. This module contains IoTDBSink and writes time series data into IoTDB using a Flink job.

Sample Code

This example shows how to send data from a Flink job to an IoTDB server.

  • A simulated source SensorSource generates a data point every second.
  • Flink uses IoTDBSink to consume produced data and write the data into IoTDB.

Session object parameters include the IP address, port number, username, and password of the node where the IoTDBServer is located.

  • On FusionInsight Manager, choose Cluster > Services > IoTDB > Instance to view the IP address of the node where the IoTDBServer to be connected is located.
  • To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configuration, and click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
  • In security mode, the username and password for logging in to the node where IoTDBServer resides are controlled by FusionInsight Manager. Ensure that the user has the permissions to operate the IoTDB and Flink services. For details, see Preparing for User Authentication.
  • You need to set the username and password for authentication in the local environment variables. You are advised to store the username and password in ciphertext and decrypt them upon using.
    • Authentication username is the username for accessing IoTDB.
    • Password is the password for accessing IoTDB.
public class FlinkIoTDBSink {
  public static void main(String[] args) throws Exception {
    // run the flink job on local mini cluster
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    IoTDBSinkOptions options = new IoTDBSinkOptions();
    options.setHost("127.0.0.1");
    options.setPort(22260);
    options.setUser("Username");
    options.setPassword("Password");

    // If the server enables auto_create_schema, then we do not need to register all timeseries
    // here.
    options.setTimeseriesOptionList(
            Lists.newArrayList(
                    new IoTDBSinkOptions.TimeseriesOption(
                            "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));

    IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
    IoTDBSink ioTDBSink =
        new IoTDBSink(options, serializationSchema)
            // enable batching
            .withBatchSize(10)
            // how many connections to the server will be created for each parallelism
            .withSessionPoolSize(3);

    env.addSource(new SensorSource())
        .name("sensor-source")
        .setParallelism(1)
        .addSink(ioTDBSink)
        .name("iotdb-sink");

    env.execute("iotdb-flink-example");
  }

  private static class SensorSource implements SourceFunction<Map<String, String>> {
    boolean running = true;
    Random random = new SecureRandom();

    @Override
    public void run(SourceContext context) throws Exception {
      while (running) {
        Map<String, String> tuple = new HashMap();
        tuple.put("device", "root.sg.d1");
        tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
        tuple.put("measurements", "s1");
        tuple.put("types", "DOUBLE");
        tuple.put("values", String.valueOf(random.nextDouble()));

        context.collect(tuple);
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }
}