更新时间:2024-08-03 GMT+08:00

FlinkIoTDBSink样例程序

功能简介

IoTDB与Flink的集成。此模块包含了iotdb sink,通过flink job将时序数据写入IoTDB。

代码样例

该样例演示了从一个Flink job中发送数据到IoTDB server的场景。

  • 一个模拟的Source SensorSource每秒钟产生一个数据点。
  • Flink使用IoTDBSink消费产生数据并写入IoTDB。

其中在Session对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。

  • 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manage界面,选择“集群 > 服务 > IoTDB > 实例”查看。
  • RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
  • 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务和Flink服务的角色权限。
  • 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
    • 认证用户名为访问IoTDB的用户名。
    • 认证用户密码为访问IoTDB的用户密码。
public class FlinkIoTDBSink {
  public static void main(String[] args) throws Exception {
    // run the flink job on local mini cluster
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    IoTDBSinkOptions options = new IoTDBSinkOptions();
    options.setHost("127.0.0.1");
    options.setPort(22260);
    options.setUser("用户名");
    options.setPassword("密码");

    // If the server enables auto_create_schema, then we do not need to register all timeseries
    // here.
    options.setTimeseriesOptionList(
            Lists.newArrayList(
                    new IoTDBSinkOptions.TimeseriesOption(
                            "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));

    IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
    IoTDBSink ioTDBSink =
        new IoTDBSink(options, serializationSchema)
            // enable batching
            .withBatchSize(10)
            // how many connections to the server will be created for each parallelism
            .withSessionPoolSize(3);

    env.addSource(new SensorSource())
        .name("sensor-source")
        .setParallelism(1)
        .addSink(ioTDBSink)
        .name("iotdb-sink");

    env.execute("iotdb-flink-example");
  }

  private static class SensorSource implements SourceFunction<Map<String, String>> {
    boolean running = true;
    Random random = new SecureRandom();

    @Override
    public void run(SourceContext context) throws Exception {
      while (running) {
        Map<String, String> tuple = new HashMap();
        tuple.put("device", "root.sg.d1");
        tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
        tuple.put("measurements", "s1");
        tuple.put("types", "DOUBLE");
        tuple.put("values", String.valueOf(random.nextDouble()));

        context.collect(tuple);
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }
}