更新时间:2022-02-22 GMT+08:00

自拓展输入流

用户可通过编写代码实现从想要的云生态或者开源生态获取数据,作为Flink作业的输入数据。

语法格式

1
2
3
4
5
6
7
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "user_defined",
    type_class_name = "",
    type_class_parameter = ""
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,"user_defined"表示数据源为用户自定义数据源。

type_class_name

用户实现获取源数据的source类名称,注意包含完整包路径。

type_class_parameter

用户自定义source类的入参,仅支持一个string类型的参数。

注意事项

用户自定义source类需要继承类RichParallelSourceFunction,并指定数据类型为Row例如定义类MySource:public class MySource extends RichParallelSourceFunction<Row>{},重点实现其中的open、run和close函数。

依赖pom:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-core</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>

示例

实现每周期产生一条数据(仅包含一个字段,类型为INT,初始值为1,每周期加1),周期时长为60s,通过入参指定。

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM user_in_data (
	count INT
     )
  WITH (  
	type = "user_defined", 
	type_class_name = "mySourceSink.MySource", 
	type_class_parameter = "60"
      )
      TIMESTAMP BY car_timestamp.rowtime;		

自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。