- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- DLI作业开发流程
- 准备工作
- 创建弹性资源池和队列
- 创建数据库和表
- 数据迁移与数据传输
- 配置DLI访问其他云服务的委托权限
- 使用DLI提交SQL作业
- 使用DLI提交Flink作业
- 使用DLI提交Spark作业
- 使用Notebook实例提交DLI作业
- 使用CES监控DLI服务
- 使用AOM监控DLI服务
- 使用CTS审计DLI服务
- 权限管理
- DLI常用管理操作
- 最佳实践
-
开发指南
- 使用客户端工具连接DLI
- SQL作业开发指南
- Flink作业开发指南
- Spark Jar作业开发指南
-
语法参考
-
Spark SQL语法参考
- Spark SQL常用配置项说明
- Spark SQL语法概览
- Spark开源命令支持说明
- 数据库相关
- 表相关
- 数据相关
- 导出查询结果
- 跨源连接相关
- 视图相关
- 查看计划
- 数据权限相关
- 数据类型
- 自定义函数
-
内置函数
-
日期函数
- 日期函数概览
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
字符串函数
- 字符串函数概览
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- 数学函数
- 聚合函数
- 分析窗口函数
- 其他函数
-
日期函数
- SELECT
-
标示符
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- class_name
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- hdfs_path
- input_expression
- input_format_classname
- jar_path
- join_condition
- non_equi_join_condition
- number
- num_buckets
- output_format_classname
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- row_format
- select_statement
- separator
- serde_name
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- view_name
- view_properties
- when_expression
- where_condition
- window_function
- 运算符
-
Flink SQL语法参考
- Flink Opensource SQL1.15语法参考
- Flink Opensource SQL1.12语法参考
- Flink Opensource SQL1.10语法参考
-
HetuEngine SQL语法参考
-
HetuEngine SQL语法
- 使用前必读
- 数据类型
-
DDL 语法
- CREATE SCHEMA
- CREATE TABLE
- CREATE TABLE AS
- CREATE TABLE LIKE
- CREATE VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER SCHEMA
- DROP SCHEMA
- DROP TABLE
- DROP VIEW
- TRUNCATE TABLE
- COMMENT
- VALUES
- SHOW语法使用概要
- SHOW SCHEMAS(DATABASES)
- SHOW TABLES
- SHOW TBLPROPERTIES TABLE|VIEW
- SHOW TABLE/PARTITION EXTENDED
- SHOW FUNCTIONS
- SHOW PARTITIONS
- SHOW COLUMNS
- SHOW CREATE TABLE
- SHOW VIEWS
- SHOW CREATE VIEW
- DML 语法
- DQL 语法
- 辅助命令语法
- 预留关键字
- SQL函数和操作符
- 数据类型隐式转换
- 附录
-
HetuEngine SQL语法
- Hudi SQL语法参考
- Delta SQL语法参考
-
Spark SQL语法参考
-
API参考
- API使用前必读
- API概览
- 如何调用API
- API快速入门
- 权限相关API
- 全局变量相关API
- 资源标签相关API
- 增强型跨源连接相关API
- 跨源认证相关API
- 弹性资源池相关API
- 队列相关API(推荐)
- SQL作业相关API
- SQL模板相关API
- Flink作业相关API
- Flink作业模板相关API
- Flink作业管理相关API
- Spark作业相关API
- Spark作业模板相关API
- 权限策略和授权项
- 历史API
- 公共参数
- SDK参考
- 场景代码示例
-
常见问题
- DLI产品咨询类
- DLI弹性资源池和队列类
-
DLI数据库和表类
- 为什么在DLI控制台中查询不到表?
- OBS表压缩率较高怎么办?
- 字符码不一致导致数据乱码怎么办?
- 删除表后再重新创建同名的表,需要对操作该表的用户和项目重新赋权吗?
- DLI分区内表导入的文件不包含分区列的数据,导致数据导入完成后查询表数据失败怎么办?
- 创建OBS外表,由于OBS文件中的某字段存在换行符导致表字段数据错误怎么办?
- join表时没有添加on条件,造成笛卡尔积查询,导致队列资源爆满,作业运行失败怎么办?
- 手动在OBS表的分区目录下添加了数据,但是无法查询到数据怎么办?
- 为什么insert overwrite覆盖分区表数据的时候,覆盖了全量数据?
- 跨源连接RDS表中create_date字段类型是datetime,为什么DLI中查出来的是时间戳呢?
- SQL作业执行完成后,修改表名导致datasize不正确怎么办?
- 从DLI导入数据到OBS,数据量不一致怎么办?
-
增强型跨源连接类
- 增强型跨源连接绑定队列失败怎么办?
- DLI增强型跨源连接DWS失败怎么办?
- 创建跨源成功但测试网络连通性失败怎么办?
- 怎样配置DLI队列与数据源的网络连通?
- 为什么DLI增强型跨源连接要创建对等连接?
- DLI创建跨源连接,绑定队列一直在创建中怎么办?
- 新建跨源连接,显示已激活,但使用时提示communication link failure错误怎么办?
- 跨源访问MRS HBase,连接超时,日志未打印错误怎么办?
- DLI跨源连接报错找不到子网怎么办?
- 跨源RDS表,执行insert overwrite提示Incorrect string value错误怎么办?
- 创建RDS跨源表提示空指针错误怎么办?
- 对跨源DWS表执行insert overwrite操作,报错:org.postgresql.util.PSQLException: ERROR: tuple concurrently updated
- 通过跨源表向CloudTable Hbase表导入数据,executor报错:RegionTooBusyException
- 通过DLI跨源写DWS表,非空字段出现空值异常怎么办?
- 更新跨源目的端源表后,未同时更新对应跨源表,导致insert作业失败怎么办?
- RDS表有自增主键时怎样在DLI插入数据?
-
SQL作业类
- SQL作业开发类
-
SQL作业运维类
- 用户导表到OBS报“path obs://xxx already exists”错误
- 对两个表进行join操作时,提示:SQL_ANALYSIS_ERROR: Reference 't.id' is ambiguous, could be: t.id, t.id.;
- 执行查询语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget.
- 执行查询语句报错:There should be at least one partition pruning predicate on partitioned table XX.YYY
- LOAD数据到OBS外表报错:IllegalArgumentException: Buffer size too small. size
- SQL作业运行报错:DLI.0002 FileNotFoundException
- 用户通过CTAS创建hive表报schema解析异常错误
- 在DataArts Studio上运行DLI SQL脚本,执行结果报org.apache.hadoop.fs.obs.OBSIOException错误
- 使用CDM迁移数据到DLI,迁移作业日志上报UQUERY_CONNECTOR_0001:Invoke DLI service api failed错误
- SQL作业访问报错:File not Found
- SQL作业访问报错:DLI.0003: AccessControlException XXX
- SQL作业访问外表报错:DLI.0001: org.apache.hadoop.security.AccessControlException: verifyBucketExists on {{桶名}}: status [403]
- 执行SQL语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget.
-
Flink作业类
- Flink作业咨询类
-
Flink SQL作业类
- 怎样将OBS表映射为DLI的分区表?
- Flink SQL作业Kafka分区数增加或减少,怎样不停止Flink作业实现动态感知?
- 在Flink SQL作业中创建表使用EL表达式,作业运行提示DLI.0005错误怎么办?
- Flink作业输出流写入数据到OBS,通过该OBS文件路径创建的DLI表查询无数据
- Flink SQL作业运行失败,日志中有connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null错误
- Flink SQL作业消费Kafka后sink到es集群,作业执行成功,但未写入数据
- Flink Opensource SQL如何解析复杂嵌套 JSON?
- Flink Opensource SQL从RDS数据库读取的时间和RDS数据库存储的时间为什么会不一致?
- Flink Opensource SQL Elasticsearch结果表failure-handler参数填写retry_rejected导致提交失败
- Kafka Sink配置发送失败重试机制
- 如何在一个Flink作业中将数据写入到不同的Elasticsearch集群中?
- 作业语义检验时提示DIS通道不存在怎么处理?
- Flink jobmanager日志一直报Timeout expired while fetching topic metadata怎么办?
- Flink Jar作业类
- Flink作业性能调优类
-
Spark作业相类
- Spark作业开发类
-
Spark作业运维类
- 运行Spark作业报java.lang.AbstractMethodError
- Spark作业访问OBS数据时报ResponseCode: 403和ResponseStatus: Forbidden错误
- 有访问OBS对应的桶的权限,但是Spark作业访问时报错 verifyBucketExists on XXXX: status [403]
- Spark作业运行大批量数据时上报作业运行超时异常错误
- 使用Spark作业访问sftp中的文件,作业运行失败,日志显示访问目录异常
- 执行作业的用户数据库和表权限不足导致作业运行失败
- 为什么Spark3.x的作业日志中打印找不到global_temp数据库
- 在使用Spark2.3.x访问元数据时,DataSource语法创建avro类型的OBS表创建失败
- DLI资源配额类
- DLI权限管理类
- DLI API类
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
Redis源表
功能描述
创建source流从Redis获取数据,作为作业的输入数据。
前提条件
创建该作业前,需要建立DLI和Redis的增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
- 如果需要获取key的值,则可以通过在Flink中设置主键获取,主键字段即对应Redis的key。
- 如果定义主键,则不能够定义复合主键,即主键只能是一个字段,不能是多个字段。
- schema-syntax取值约束:
- 当schema-syntax为map或array时,非主键字段最多只能有一个,且需要为相应的map或array类型。
- 当schema-syntax为fields-scores时,非主键字段个数需要为偶数,且除主键字段外,每两个字段的第二个字段的数据类型需要为double,该字段的值视为前一个字段的score。其示例如下:
CREATE TABLE redisSource ( redisKey string, order_id string, score1 double, order_channel string, score2 double, order_time string, score3 double, pay_amount double, score4 double, real_pay double, score5 double, pay_time string, score6 double, user_id string, score7 double, user_name string, score8 double, area_id string, score9 double, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields-scores' );
- data-type取值约束:
- 当data-type为set时,Flink中定义的非主键字段的数据类型必须相同。
- 当data-type为sorted-set并且schema-syntax为fields和array时,只能读取redis的sorted set中的值,而不能读取score。
- 当data-type为string时,只能有一个非主键字段。
- 当data-type为sorted-set,且schema-syntax为map时,除主键字段外,只能有一个非主键字段。
该非主键字段需要为map类型,同时该字段map的value需要为double类型,表示score,该字段的map的key表示redis的set中的值。
- 当data-type为sorted-set,且schema-syntax为array-scores时,除主键字段外,只能有两个非主键字段,且这两个字段的类型需要为array。
两个字段其中第一个字段类型是array,表示Redis的set中的值;第二个字段类型为array<double>,表示相应索引的score。其示例如下:
CREATE TABLE redisSink ( order_id string, arrayField Array<String>, arrayScore array<double>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', "default-score" = '3', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array-scores' );
语法格式
1 2 3 4 5 6 7 8 9 10 |
create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' ); |
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
connector类型,需配置为'redis'。 |
host |
是 |
无 |
String |
redis连接地址。 |
port |
否 |
6379 |
Integer |
redis连接端口。 |
password |
否 |
无 |
String |
redis认证密码。 |
namespace |
否 |
无 |
String |
redis key的namespace |
delimiter |
否 |
: |
String |
redis的key和namespace之间的分隔符。 |
data-type |
否 |
hash |
String |
redis的数据类型,有下列选项:
data-type取值约束详见data-type取值约束说明。 |
schema-syntax |
否 |
fields |
String |
redis的schema语义,包含以下值(其具体使用请参考注意事项和常见问题):
schema-syntax取值约束详见schema-syntax取值约束说明。 |
deploy-mode |
否 |
standalone |
String |
Redis集群的部署模式,支持standalone、master-replica、cluster。默认为standalone。 Redis实例类型不同配置的部署模式不同: 单机、主备、proxy集群实例都选择standalone, cluster实例选择cluster。 |
retry-count |
否 |
5 |
Integer |
连接redis集群的尝试次数。 |
connection-timeout-millis |
否 |
10000 |
Integer |
尝试连接redis集群时的最大超时时间。 |
commands-timeout-millis |
否 |
2000 |
Integer |
等待操作完成响应的最大时间。 |
rebalancing-timeout-millis |
否 |
15000 |
Integer |
redis集群失败时的休眠时间。 |
scan-keys-count |
否 |
1000 |
Integer |
每次扫描时读取的数量。 |
default-score |
否 |
0 |
Double |
当data-type设置为“sorted-set”时的默认score。 |
deserialize-error-policy |
否 |
fail-job |
Enum |
数据解析失败时的处理方式。枚举类型,包含以下值:
|
skip-null-values |
否 |
true |
Boolean |
是否跳过null。 |
ignore-retractions |
否 |
false |
Boolean |
连接器应忽略更新插入/撤回流模式下的收回消息。 |
key-column |
否 |
无 |
String |
Redis 表schema的key |
source.parallelism |
否 |
无 |
int |
定义源的自定义并行度。默认情况下,如果未定义此选项,使用全局配置来的并行度。 |
示例
该示例是从DCS Redis数据源中读取数据,并写入Print到结果表中,其具体步骤如下:
- 参考增强型跨源连接,根据redis所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
- 设置Redis的安全组,添加入向规则使其对Flink的队列网段放通。
参考测试地址连通性根据redis的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
- 在Redis客户端中执行如下命令,向不同的key中插入数据,以hash形式存储:
HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106 HMSET redisSource1 order_id 202103241606060001 order_channel appShop order_time "2021-03-24 16:06:06" pay_amount 200.00 real_pay 180.00 pay_time "2021-03-24 16:10:06" user_id 0001 user_name Alice area_id 330106 HMSET redisSource2 order_id 202103251202020001 order_channel miniAppShop order_time "2021-03-25 12:02:02" pay_amount 60.00 real_pay 60.00 pay_time "2021-03-25 12:03:00" user_id 0002 user_name Bob area_id 330110
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本读取Redis中hash格式的数据。
如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE redisSource ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, primary key (redisKey) not enforced --获取redis中key的值 ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); CREATE TABLE printSink ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from redisSource;
- 按照如下方式查看taskmanager.out文件中的数据结果:
- 登录DLI管理控制台,选择“作业管理 > Flink作业”。
- 单击对应的Flink作业名称,选择“运行日志”,单击“OBS桶”,根据作业运行的日期,找到对应日志的文件夹。
- 进入对应日期的文件夹后,找到名字中包含“taskmanager”的文件夹进入,下载获取taskmanager.out文件查看结果日志。
数据结果参考如下:
+I(redisSource1,202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106) +I(redisSource,202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106) +I(redisSource2,202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)
常见问题
- Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决?
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: RealLine:36;Usage of 'set' data-type and 'fields' schema syntax in source Redis connector with multiple non-key column types. As 'set' in Redis is not sorted, it's not possible to map 'set's values to table schema with different types.
A:data-type为set类型时,flink中非主键字段的数据类型不相同,导致如上报错。data-type为set类型时,Flink中定义的非主键字段的数据类型必须相同。
- Q:当使用data-type为hash时,那么schema-syntax为fields和map有什么区别?
A:当schema-syntax为fields时,会将Redis的key中hash值赋给flink中同名相应字段;当schema-syntax为map时,会将Redis的每个hash中的hashkey和hashvalue放入一个map中,该map即为flink中相应字段的值,即这个map中包含Redis中某个key的所有hashkey和hashvalue。
- 对于fields而言:
- 向Redis中插入如下数据
HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
- 当使用schema-syntax为fields时,作业脚本参考如下:
CREATE TABLE redisSource ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); CREATE TABLE printSink ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from redisSource;
- 作业运行结果如下:
+I(redisSource,202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106)
- 向Redis中插入如下数据
- 对于map而言:
- 向Redis中插入如下数据:
HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
- 当使用schema-syntax为map时,其作业脚本参考如下:
CREATE TABLE redisSource ( redisKey string, order_result map<string, string>, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'map' ); CREATE TABLE printSink ( redisKey string, order_result map<string, string> ) WITH ( 'connector' = 'print' ); insert into printSink select * from redisSource;
- 作业运行结果如下:
+I(redisSource,{user_id=0001, user_name=Alice, pay_amount=100.00, real_pay=100.00, order_time=2021-03-24 10:00:00, area_id=330106, order_id=202103241000000001, order_channel=webShop, pay_time=2021-03-24 10:02:03})
- 向Redis中插入如下数据:
- 对于fields而言: