更新时间:2024-12-06 GMT+08:00
FlinkIoTDBSink样例程序
功能简介
IoTDB与Flink的集成。此模块包含了iotdb sink,通过flink job将时序数据写入IoTDB。
代码样例
该样例演示了从一个Flink job中发送数据到IoTDB server的场景。
- 一个模拟的Source SensorSource每秒钟产生一个数据点。
- Flink使用IoTDBSink消费产生数据并写入IoTDB。
其中在Session对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
- 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”查看。
- RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
- 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务和Flink服务的角色权限。
- 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
- 认证用户名为访问IoTDB的用户名。
- 认证用户密码为访问IoTDB的用户密码。
public class FlinkIoTDBSink {
public static void main(String[] args) throws Exception {
// run the flink job on local mini cluster
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
IoTDBSinkOptions options = new IoTDBSinkOptions();
options.setHost("127.0.0.1");
options.setPort(22260);
options.setUser("用户名");
options.setPassword("密码");
// If the server enables auto_create_schema, then we do not need to register all timeseries
// here.
options.setTimeseriesOptionList(
Lists.newArrayList(
new IoTDBSinkOptions.TimeseriesOption(
"root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
IoTDBSink ioTDBSink =
new IoTDBSink(options, serializationSchema)
// enable batching
.withBatchSize(10)
// how many connections to the server will be created for each parallelism
.withSessionPoolSize(3);
env.addSource(new SensorSource())
.name("sensor-source")
.setParallelism(1)
.addSink(ioTDBSink)
.name("iotdb-sink");
env.execute("iotdb-flink-example");
}
private static class SensorSource implements SourceFunction<Map<String, String>> {
boolean running = true;
Random random = new SecureRandom();
@Override
public void run(SourceContext context) throws Exception {
while (running) {
Map<String, String> tuple = new HashMap();
tuple.put("device", "root.sg.d1");
tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
tuple.put("measurements", "s1");
tuple.put("types", "DOUBLE");
tuple.put("values", String.valueOf(random.nextDouble()));
context.collect(tuple);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
父主题: IoTDB Flink样例程序