FlinkIoTDBSource
Description
It is an integration of IoTDB and Flink. This module contains IoTDBSource and reads time series data from IoTDB and prints the data using a Flink job.
Sample Code
This example shows how a Flink job reads time series data from a IoTDB server.
- Flink uses IoTDBSource to read data from a IoTDB server.
- To use IoTDBSource, you need to construct an IoTDBSource instance by specifying IoTDBSourceOptions and implementing the abstract method convert() in IoTDBSource. The convert() method defines how you want to convert row data.
Session object parameters include the IP address, port number, username, and password of the node where the IoTDBServer is located.
- On FusionInsight Manager, choose Cluster > Services > IoTDB > Instance to view the IP address of the node where the IoTDBServer to be connected is located.
- To obtain the RPC port number, log in to FusionInsight Manager, choose Cluster > Services > IoTDB. Click Configuration, click All Configurations, and search for IOTDB_SERVER_RPC_PORT.
- In normal mode, IoTDB has a default user root after initial installation. To obtain the password, see User Account List. This user is an administrator and has all permissions, which cannot be assigned, revoked, or deleted.
- You need to set the username and password for authentication in the local environment variables. You are advised to store the username and password in ciphertext and decrypt them upon using.
- Authentication username is the username for accessing IoTDB.
- Password is the password for accessing IoTDB.
public class FlinkIoTDBSource { private static final String IOTDB_SSL_ENABLE = "true";//To obtain the value, log in to FusionInsight Manager, choose Cluster > Services > IoTDB, click Configurations, search for SSL in the search box, and view the value of SSL_ENABLE. static final String LOCAL_HOST = "127.0.0.1"; static final String ROOT_SG1_D1_S1 = "root.sg1.d1.s1"; static final String ROOT_SG1_D1 = "root.sg1.d1"; public static void main(String[] args) throws Exception { // use session api to create data in IoTDB prepareData(); // run the flink job on local mini cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IoTDBSourceOptions ioTDBSourceOptions = new IoTDBSourceOptions( LOCAL_HOST, 22260, "Authentication username", "Password", "select s1 from "+ ROOT_SG1_D1 +" align by device"); IoTDBSource<RowRecord> source = new IoTDBSource<RowRecord>(ioTDBSourceOptions) { @Override public RowRecord convert(RowRecord rowRecord) { return rowRecord; } }; env.addSource(source).name("sensor-source").print().setParallelism(2); env.execute(); } private static void prepareData() throws IoTDBConnectionException, StatementExecutionException, TTransportException { // set iotdb_ssl_enable System.setProperty("iotdb_ssl_enable", IOTDB_SSL_ENABLE); if ("true".equals(IOTDB_SSL_ENABLE)) { // set truststore.jks path System.setProperty("iotdb_ssl_truststore", "truststore file path"); } Session session = new Session(LOCAL_HOST, 22260, "Authentication username", "Password"); session.open(false); try { session.setStorageGroup("root.sg1"); if (!session.checkTimeseriesExists(ROOT_SG1_D1_S1)) { session.createTimeseries( ROOT_SG1_D1_S1, TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); List<String> measurements = new ArrayList<>(); measurements.add("s1"); measurements.add("s2"); measurements.add("s3"); List<TSDataType> types = new ArrayList<>(); types.add(TSDataType.INT64); types.add(TSDataType.INT64); types.add(TSDataType.INT64); for (long time = 0; time < 1000; time++) { List<Object> values = new ArrayList<>(); values.add(1L); values.add(2L); values.add(3L); session.insertRecord(ROOT_SG1_D1, time, measurements, types, values); } } } catch (StatementExecutionException e) { if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) { throw e; } } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot