文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
IoTDB开发指南(安全模式)/
开发IoTDB应用/
IoTDB Flink样例程序/
FlinkIoTDBSource样例程序
更新时间:2024-12-06 GMT+08:00
FlinkIoTDBSource样例程序
功能简介
IoTDB与Flink的集成。此模块包含了iotdb source,通过flink job将时序数据从IoTDB读取出来并且打印。
代码样例
该示例演示了Flink job如何从IoTDB server读取时序数据的场景:
- Flink使用IoTDBSource从IoTDB server读取数据。
- 要使用IoTDBSource,您需要构造一个IoTDBSource的实例,通过指定的IoTDBSourceOptions并在IoTDBSource中实现抽象方法convert(),convert()定义了您希望如何转换行数据。
其中在Session对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
- 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”查看。
- RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
- 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务和Flink服务的角色权限。
- 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
- 认证用户名为访问IoTDB的用户名。
- 认证用户密码为访问IoTDB的用户密码。
public class FlinkIoTDBSource {
/**
* 安全模式下,“SSL_ENABLE”默认为“true”,需要导入truststore.jks文件。
* 安全模式下,也可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,修改“SSL_ENABLE”参数值为“false”;保存配置后需重启IoTDB服务使配置生效。并修改客户端“客户端安装目录/IoTDB/iotdb/conf”目录下的“iotdb-client.env”文件中的配置:iotdb_ssl_enable="false"。
*/
private static final String IOTDB_SSL_ENABLE = "true"; //该值为“SSL_ENABLE”参数值。
static final String LOCAL_HOST = "127.0.0.1";
static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1";
static final String ROOT_SG1_D1 = "root.sg1.d1";
public static void main(String[] args) throws Exception {
// use session api to create data in IoTDB
prepareData();
// run the flink job on local mini cluster
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
IoTDBSourceOptions ioTDBSourceOptions =
new IoTDBSourceOptions(
LOCAL_HOST, 22260, "认证用户名", "认证用户密码", "select s1 from " + ROOT_SG1_D1 + " align by device");
IoTDBSource<RowRecord> source =
new IoTDBSource<RowRecord>(ioTDBSourceOptions) {
@Override
public RowRecord convert(RowRecord rowRecord) {
return rowRecord;
}
};
env.addSource(source).name("sensor-source").print().setParallelism(2);
env.execute();
}
private static void prepareData()
throws IoTDBConnectionException, StatementExecutionException, TTransportException {
// set iotdb_ssl_enable
System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE);
if ("true".equals(IOTDB_SSL_ENABLE)) {
// set truststore.jks path
System.setProperty("iotdb_ssl_truststore", "truststore文件路径");
}
Session session = new Session(LOCAL_HOST, 22260, "认证用户名", "认证用户密码");
session.open(false);
try {
session.setStorageGroup("root.sg1");
if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) {
session.createTimeseries(
ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
List<TSDataType> types = new ArrayList<>();
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
for (long time = 0; time < 1000; time++) {
List<Object> values = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
session.insertRecord(ROOT_SG1_D1, time, measurements, types, values);
}
}
} catch (StatementExecutionException e) {
if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
throw e;
}
}
}
}
父主题: IoTDB Flink样例程序