自拓展输入流
用户可通过编写代码实现从想要的云生态或者开源生态获取数据,作为Flink作业的输入数据。
语法格式
1 2 3 4 5 6 7 |
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" ) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,"user_defined"表示数据源为用户自定义数据源。 |
type_class_name |
是 |
用户实现获取源数据的source类名称,注意包含完整包路径。 |
type_class_parameter |
是 |
用户自定义source类的入参,仅支持一个string类型的参数。 |
注意事项
用户自定义source类需要继承类RichParallelSourceFunction,并指定数据类型为Row例如定义类MySource:public class MySource extends RichParallelSourceFunction<Row>{},重点实现其中的open、run和close函数。
依赖pom:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
示例
实现每周期产生一条数据(仅包含一个字段,类型为INT,初始值为1,每周期加1),周期时长为60s,通过入参指定。
1 2 3 4 5 6 7 8 9 |
CREATE SOURCE STREAM user_in_data ( count INT ) WITH ( type = "user_defined", type_class_name = "mySourceSink.MySource", type_class_parameter = "60" ) TIMESTAMP BY car_timestamp.rowtime; |
自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。