文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
IoTDB开发指南(安全模式)/
开发IoTDB应用/
IoTDB Flink样例程序/
FlinkIoTDBSource样例程序
更新时间:2024-12-06 GMT+08:00
FlinkIoTDBSource样例程序
功能简介
IoTDB与Flink的集成。此模块包含了iotdb source,通过flink job将时序数据从IoTDB读取出来并且打印。
代码样例
该示例演示了Flink job如何从IoTDB server读取时序数据的场景:
- Flink使用IoTDBSource从IoTDB server读取数据。
- 要使用IoTDBSource,您需要构造一个IoTDBSource的实例,通过指定的IoTDBSourceOptions并在IoTDBSource中实现抽象方法convert(),convert()定义了您希望如何转换行数据。
其中在Session对象的参数里,设置IoTDBServer所在的节点IP、端口、用户名和密码。
- 待连接的IoTDBServer所在的节点IP地址,可通过登录FusionInsight Manager界面,选择“集群 > 服务 > IoTDB > 实例”查看。
- RPC端口可通过登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置 > 全部配置”,搜索参数“IOTDB_SERVER_RPC_PORT”获得。
- 安全模式下,登录IoTDBServer所在节点的用户名和密码由FusionInsight Manager统一控制,参考准备集群认证用户信息,确保该用户具有操作IoTDB服务和Flink服务的角色权限。
- 需在本地环境变量中设置环境变量认证用户名和认证用户密码,建议密文存放,使用时解密,确保安全。其中:
- 认证用户名为访问IoTDB的用户名。
- 认证用户密码为访问IoTDB的用户密码。
public class FlinkIoTDBSource { /** * 安全模式下,“SSL_ENABLE”默认为“true”,需要导入truststore.jks文件。 * 安全模式下,也可登录FusionInsight Manager,选择“集群 > 服务 > IoTDB > 配置”,在搜索框中搜索“SSL”,修改“SSL_ENABLE”参数值为“false”;保存配置后需重启IoTDB服务使配置生效。并修改客户端“客户端安装目录/IoTDB/iotdb/conf”目录下的“iotdb-client.env”文件中的配置:iotdb_ssl_enable="false"。 */ private static final String IOTDB_SSL_ENABLE = "true"; //该值为“SSL_ENABLE”参数值。 static final String LOCAL_HOST = "127.0.0.1"; static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; static final String ROOT_SG1_D1 = "root.sg1.d1"; public static void main(String[] args) throws Exception { // use session api to create data in IoTDB prepareData(); // run the flink job on local mini cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IoTDBSourceOptions ioTDBSourceOptions = new IoTDBSourceOptions( LOCAL_HOST, 22260, "认证用户名", "认证用户密码", "select s1 from " + ROOT_SG1_D1 + " align by device"); IoTDBSource<RowRecord> source = new IoTDBSource<RowRecord>(ioTDBSourceOptions) { @Override public RowRecord convert(RowRecord rowRecord) { return rowRecord; } }; env.addSource(source).name("sensor-source").print().setParallelism(2); env.execute(); } private static void prepareData() throws IoTDBConnectionException, StatementExecutionException, TTransportException { // set iotdb_ssl_enable System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE); if ("true".equals(IOTDB_SSL_ENABLE)) { // set truststore.jks path System.setProperty("iotdb_ssl_truststore", "truststore文件路径"); } Session session = new Session(LOCAL_HOST, 22260, "认证用户名", "认证用户密码"); session.open(false); try { session.setStorageGroup("root.sg1"); if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) { session.createTimeseries( ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); List<String> measurements = new ArrayList<>(); measurements.add("s1"); measurements.add("s2"); measurements.add("s3"); List<TSDataType> types = new ArrayList<>(); types.add(TSDataType.INT64); types.add(TSDataType.INT64); types.add(TSDataType.INT64); for (long time = 0; time < 1000; time++) { List<Object> values = new ArrayList<>(); values.add(1L); values.add(2L); values.add(3L); session.insertRecord(ROOT_SG1_D1, time, measurements, types, values); } } } catch (StatementExecutionException e) { if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) { throw e; } } } }
父主题: IoTDB Flink样例程序