更新时间:2024-08-03 GMT+08:00
FlinkIoTDBSink样例程序
功能简介
IoTDB与Flink的集成。此模块包含了iotdb sink,通过flink job将时序数据写入IoTDB。
代码样例
该样例演示了从一个Flink job中发送数据到IoTDB server的场景。
- 一个模拟的Source SensorSource每秒钟产生一个数据点。
- Flink使用IoTDBSink消费产生数据并写入IoTDB。
其中在Session对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
- 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manage界面,选择“集群 > 服务 > IoTDB > 实例”查看。
- RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
- 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务和Flink服务的角色权限。
- 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
- 认证用户名为访问IoTDB的用户名。
- 认证用户密码为访问IoTDB的用户密码。
public class FlinkIoTDBSink { public static void main(String[] args) throws Exception { // run the flink job on local mini cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IoTDBSinkOptions options = new IoTDBSinkOptions(); options.setHost("127.0.0.1"); options.setPort(22260); options.setUser("用户名"); options.setPassword("密码"); // If the server enables auto_create_schema, then we do not need to register all timeseries // here. options.setTimeseriesOptionList( Lists.newArrayList( new IoTDBSinkOptions.TimeseriesOption( "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY))); IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema(); IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema) // enable batching .withBatchSize(10) // how many connections to the server will be created for each parallelism .withSessionPoolSize(3); env.addSource(new SensorSource()) .name("sensor-source") .setParallelism(1) .addSink(ioTDBSink) .name("iotdb-sink"); env.execute("iotdb-flink-example"); } private static class SensorSource implements SourceFunction<Map<String, String>> { boolean running = true; Random random = new SecureRandom(); @Override public void run(SourceContext context) throws Exception { while (running) { Map<String, String> tuple = new HashMap(); tuple.put("device", "root.sg.d1"); tuple.put("timestamp", String.valueOf(System.currentTimeMillis())); tuple.put("measurements", "s1"); tuple.put("types", "DOUBLE"); tuple.put("values", String.valueOf(random.nextDouble())); context.collect(tuple); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
父主题: IoTDB Flink样例程序