更新时间:2025-01-22 GMT+08:00
        
          
          
        
      
      
      
      
      
      
      
      
  
      
      
      
        
自拓展输入流
用户可通过编写代码实现从想要的云生态或者开源生态获取数据,作为Flink作业的输入数据。
语法格式
| 1 2 3 4 5 6 7 | CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" ) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME | 
关键字
| 参数 | 是否必选 | 说明 | 
|---|---|---|
| type | 是 | 数据源类型,"user_defined"表示数据源为用户自定义数据源。 | 
| type_class_name | 是 | 用户实现获取源数据的source类名称,注意包含完整包路径。 | 
| type_class_parameter | 是 | 用户自定义source类的入参,仅支持一个string类型的参数。 | 
注意事项
用户自定义source类需要继承类RichParallelSourceFunction,并指定数据类型为Row例如定义类MySource:public class MySource extends RichParallelSourceFunction<Row>{},重点实现其中的open、run和close函数。
依赖pom:
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-core</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
 示例
实现每周期产生一条数据(仅包含一个字段,类型为INT,初始值为1,每周期加1),周期时长为60s,通过入参指定。
| 1 2 3 4 5 6 7 8 9 | CREATE SOURCE STREAM user_in_data ( count INT ) WITH ( type = "user_defined", type_class_name = "mySourceSink.MySource", type_class_parameter = "60" ) TIMESTAMP BY car_timestamp.rowtime; | 
 
 
  自定义source类实现,需要将该类打在jar包中,通过sql编辑页上传udf函数按钮上传。
   父主题: 自拓展生态
  
  
    