- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- DLI作业开发流程
- 准备工作
- 创建弹性资源池和队列
- 创建数据库和表
- 数据迁移与数据传输
- 配置DLI访问其他云服务的委托权限
- 使用DLI提交SQL作业
- 使用DLI提交Flink作业
- 使用DLI提交Spark作业
- 使用Notebook实例提交DLI作业
- 使用CES监控DLI服务
- 使用AOM监控DLI服务
- 使用CTS审计DLI服务
- 权限管理
- DLI常用管理操作
- 最佳实践
-
开发指南
- 使用客户端工具连接DLI
- SQL作业开发指南
- Flink作业开发指南
- Spark Jar作业开发指南
-
语法参考
-
Spark SQL语法参考
- Spark SQL常用配置项说明
- Spark SQL语法概览
- Spark开源命令支持说明
- 数据库相关
- 表相关
- 数据相关
- 导出查询结果
- 跨源连接相关
- 视图相关
- 查看计划
- 数据权限相关
- 数据类型
- 自定义函数
-
内置函数
-
日期函数
- 日期函数概览
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
字符串函数
- 字符串函数概览
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- 数学函数
- 聚合函数
- 分析窗口函数
- 其他函数
-
日期函数
- SELECT
-
标示符
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- class_name
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- hdfs_path
- input_expression
- input_format_classname
- jar_path
- join_condition
- non_equi_join_condition
- number
- num_buckets
- output_format_classname
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- row_format
- select_statement
- separator
- serde_name
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- view_name
- view_properties
- when_expression
- where_condition
- window_function
- 运算符
-
Flink SQL语法参考
- Flink Opensource SQL1.15语法参考
- Flink Opensource SQL1.12语法参考
- Flink Opensource SQL1.10语法参考
-
HetuEngine SQL语法参考
-
HetuEngine SQL语法
- 使用前必读
- 数据类型
-
DDL 语法
- CREATE SCHEMA
- CREATE TABLE
- CREATE TABLE AS
- CREATE TABLE LIKE
- CREATE VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER SCHEMA
- DROP SCHEMA
- DROP TABLE
- DROP VIEW
- TRUNCATE TABLE
- COMMENT
- VALUES
- SHOW语法使用概要
- SHOW SCHEMAS(DATABASES)
- SHOW TABLES
- SHOW TBLPROPERTIES TABLE|VIEW
- SHOW TABLE/PARTITION EXTENDED
- SHOW FUNCTIONS
- SHOW PARTITIONS
- SHOW COLUMNS
- SHOW CREATE TABLE
- SHOW VIEWS
- SHOW CREATE VIEW
- DML 语法
- DQL 语法
- 辅助命令语法
- 预留关键字
- SQL函数和操作符
- 数据类型隐式转换
- 附录
-
HetuEngine SQL语法
- Hudi SQL语法参考
- Delta SQL语法参考
-
Spark SQL语法参考
-
API参考
- API使用前必读
- API概览
- 如何调用API
- API快速入门
- 权限相关API
- 全局变量相关API
- 资源标签相关API
- 增强型跨源连接相关API
- 跨源认证相关API
- 弹性资源池相关API
- 队列相关API(推荐)
- SQL作业相关API
- SQL模板相关API
- Flink作业相关API
- Flink作业模板相关API
- Flink作业管理相关API
- Spark作业相关API
- Spark作业模板相关API
- 权限策略和授权项
- 历史API
- 公共参数
- SDK参考
- 场景代码示例
-
常见问题
- DLI产品咨询类
- DLI弹性资源池和队列类
-
DLI数据库和表类
- 为什么在DLI控制台中查询不到表?
- OBS表压缩率较高怎么办?
- 字符码不一致导致数据乱码怎么办?
- 删除表后再重新创建同名的表,需要对操作该表的用户和项目重新赋权吗?
- DLI分区内表导入的文件不包含分区列的数据,导致数据导入完成后查询表数据失败怎么办?
- 创建OBS外表,由于OBS文件中的某字段存在换行符导致表字段数据错误怎么办?
- join表时没有添加on条件,造成笛卡尔积查询,导致队列资源爆满,作业运行失败怎么办?
- 手动在OBS表的分区目录下添加了数据,但是无法查询到数据怎么办?
- 为什么insert overwrite覆盖分区表数据的时候,覆盖了全量数据?
- 跨源连接RDS表中create_date字段类型是datetime,为什么DLI中查出来的是时间戳呢?
- SQL作业执行完成后,修改表名导致datasize不正确怎么办?
- 从DLI导入数据到OBS,数据量不一致怎么办?
-
增强型跨源连接类
- 增强型跨源连接绑定队列失败怎么办?
- DLI增强型跨源连接DWS失败怎么办?
- 创建跨源成功但测试网络连通性失败怎么办?
- 怎样配置DLI队列与数据源的网络连通?
- 为什么DLI增强型跨源连接要创建对等连接?
- DLI创建跨源连接,绑定队列一直在创建中怎么办?
- 新建跨源连接,显示已激活,但使用时提示communication link failure错误怎么办?
- 跨源访问MRS HBase,连接超时,日志未打印错误怎么办?
- DLI跨源连接报错找不到子网怎么办?
- 跨源RDS表,执行insert overwrite提示Incorrect string value错误怎么办?
- 创建RDS跨源表提示空指针错误怎么办?
- 对跨源DWS表执行insert overwrite操作,报错:org.postgresql.util.PSQLException: ERROR: tuple concurrently updated
- 通过跨源表向CloudTable Hbase表导入数据,executor报错:RegionTooBusyException
- 通过DLI跨源写DWS表,非空字段出现空值异常怎么办?
- 更新跨源目的端源表后,未同时更新对应跨源表,导致insert作业失败怎么办?
- RDS表有自增主键时怎样在DLI插入数据?
-
SQL作业类
- SQL作业开发类
-
SQL作业运维类
- 用户导表到OBS报“path obs://xxx already exists”错误
- 对两个表进行join操作时,提示:SQL_ANALYSIS_ERROR: Reference 't.id' is ambiguous, could be: t.id, t.id.;
- 执行查询语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget.
- 执行查询语句报错:There should be at least one partition pruning predicate on partitioned table XX.YYY
- LOAD数据到OBS外表报错:IllegalArgumentException: Buffer size too small. size
- SQL作业运行报错:DLI.0002 FileNotFoundException
- 用户通过CTAS创建hive表报schema解析异常错误
- 在DataArts Studio上运行DLI SQL脚本,执行结果报org.apache.hadoop.fs.obs.OBSIOException错误
- 使用CDM迁移数据到DLI,迁移作业日志上报UQUERY_CONNECTOR_0001:Invoke DLI service api failed错误
- SQL作业访问报错:File not Found
- SQL作业访问报错:DLI.0003: AccessControlException XXX
- SQL作业访问外表报错:DLI.0001: org.apache.hadoop.security.AccessControlException: verifyBucketExists on {{桶名}}: status [403]
- 执行SQL语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget.
-
Flink作业类
- Flink作业咨询类
-
Flink SQL作业类
- 怎样将OBS表映射为DLI的分区表?
- Flink SQL作业Kafka分区数增加或减少,怎样不停止Flink作业实现动态感知?
- 在Flink SQL作业中创建表使用EL表达式,作业运行提示DLI.0005错误怎么办?
- Flink作业输出流写入数据到OBS,通过该OBS文件路径创建的DLI表查询无数据
- Flink SQL作业运行失败,日志中有connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null错误
- Flink SQL作业消费Kafka后sink到es集群,作业执行成功,但未写入数据
- Flink Opensource SQL如何解析复杂嵌套 JSON?
- Flink Opensource SQL从RDS数据库读取的时间和RDS数据库存储的时间为什么会不一致?
- Flink Opensource SQL Elasticsearch结果表failure-handler参数填写retry_rejected导致提交失败
- Kafka Sink配置发送失败重试机制
- 如何在一个Flink作业中将数据写入到不同的Elasticsearch集群中?
- 作业语义检验时提示DIS通道不存在怎么处理?
- Flink jobmanager日志一直报Timeout expired while fetching topic metadata怎么办?
- Flink Jar作业类
- Flink作业性能调优类
-
Spark作业相类
- Spark作业开发类
-
Spark作业运维类
- 运行Spark作业报java.lang.AbstractMethodError
- Spark作业访问OBS数据时报ResponseCode: 403和ResponseStatus: Forbidden错误
- 有访问OBS对应的桶的权限,但是Spark作业访问时报错 verifyBucketExists on XXXX: status [403]
- Spark作业运行大批量数据时上报作业运行超时异常错误
- 使用Spark作业访问sftp中的文件,作业运行失败,日志显示访问目录异常
- 执行作业的用户数据库和表权限不足导致作业运行失败
- 为什么Spark3.x的作业日志中打印找不到global_temp数据库
- 在使用Spark2.3.x访问元数据时,DataSource语法创建avro类型的OBS表创建失败
- DLI资源配额类
- DLI权限管理类
- DLI API类
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
Elasticsearch结果表
功能描述
DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。
云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。
云搜索服务的更多信息,请参见《云搜索服务用户指南》
前提条件
- 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
- 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》中创建集群章节。
- 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
- Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。
跨源认证简介及操作方法请参考跨源认证简介。
注意事项
- 当前只支持CSS集群7.X及以上版本,推荐使用7.6.2版本。
- 若未开启安全模式,无需使用任何跨源认证,即无需配置pwd_auth_name、es_auth_name、user_name、password、certificate,且语法中hosts字段值以http开头。
- 若开启安全模式,未开启https:
- 方法1:推荐使用password类型跨源认证,并配置pwd_auth_name为跨源认证的名称,且语法中hosts字段值以http开头。
- 方法2:不使用跨源认证,但需要配置用户名username、密码password,且语法中hosts字段值以http开头。
- 若开启安全模式,开启https:
- 方法1:推荐使用CSS类型跨源认证名称,并配置es_auth_name为跨源认证的名称。请注意该场景hosts字段值以https开头。
- 方法2:不使用跨源认证,但需要配置用户名username、密码password、证书位置certificate。请注意该场景hosts字段值以https开头。
- CSS集群安全组入向规则必须开启ICMP。
- 数据类型的使用,请参考Format章节。
- 提交Flink作业前,建议勾选“保存作业日志”参数,在OBS桶选项中选择日志保存的位置,方便后续作业提交失败或运行异常时,查看日志并分析问题原因。
- Elasticsearch结果表根据是否定义了主键确定是在upsert模式还是在append模式下工作。
- 如果定义了主键,Elasticsearch Sink将在upsert模式下工作,该模式可以消费包含UPDATE和DELETE的消息。
- 如果未定义主键,Elasticsearch Sink将以append模式工作,该模式只能消费INSERT消息。
在Elasticsearch结果表中,主键用于计算Elasticsearch的文档ID。文档ID为最多512个字节不包含空格的字符串。Elasticsearch结果表通过使用“document-id.key-delimiter”参数指定的键分隔符按照DDL中定义的顺序连接所有主键字段,从而为每一行生成一个文档ID字符串。某些类型(例如BYTES、ROW、ARRAY和MAP等)由于没有对应的字符串表示形式,所以不允许其作为主键字段。如果未指定主键,Elasticsearch将自动生成随机的文档ID。
- Elasticsearch结果表同时支持静态索引和动态索引。
- 如果使用静态索引,则索引选项值应为纯字符串,例如myusers,所有记录都将被写入myusers索引。
- 如果使用动态索引,可以使用{field_name}引用记录中的字段值以动态生成目标索引。您还可以使用 {field_name|date_format_string}将TIMESTAMP、DATE和TIME类型的字段值转换为date_format_string指定的格式。date_format_string与Java的DateTimeFormatter兼容。例如,如果设置为myusers-{log_ts|yyyy-MM-dd},则log_ts字段值为2020-03-27 12:25:55的记录将被写入myusers-2020-03-27索引。
语法格式
create table esSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'elasticsearch-7', 'hosts' = '', 'index' = '' );
参数说明
参数 |
是否必选 |
默认值 |
类型 |
说明 |
---|---|---|---|---|
connector |
是 |
无 |
String |
指定要使用的连接器,固定为:elasticsearch-7。表示连接到 Elasticsearch 7.x 及更高版本集群。 |
hosts |
是 |
无 |
String |
Elasticsearch所在集群的主机名,多个以';'间隔。 |
index |
是 |
无 |
String |
每条记录的 Elasticsearch 索引。可以是静态索引(例如'myIndex')或动态索引(例如'index-{log_ts|yyyy-MM-dd}')。 |
username |
否 |
无 |
String |
Elasticsearch所在集群的账号。该账号参数需和密码“password”参数同时配置。 |
password |
否 |
无 |
String |
Elasticsearch所在集群的密码。该密码参数需和“username”参数同时配置。 |
certificate |
否 |
无 |
String |
Elasticsearch集群的证书在obs中的位置。 例如:obs://bucket/path/CloudSearchService.cer 仅在开启安全模式,且开启https,且未使用其他跨源认证的场景下下需要配置该参数。 |
document-id.key-delimiter |
否 |
_ |
String |
连接复合主键的拼接符,默认为_。 |
failure-handler |
否 |
fail |
String |
对Elasticsearch请求失败时的故障处理策略。有效的策略是:
|
sink.flush-on-checkpoint |
否 |
true |
Boolean |
是否在检查点刷新。 如果配置为false,在Elasticsearch进行Checkpoint时,connector将不等待确认所有pending请求已完成。因此,connector不会为请求提供at-least-once保证。 |
sink.bulk-flush.max-actions |
否 |
1000 |
Interger |
每个批量请求的最大缓冲操作数。可以设置'0'为禁用它。 |
sink.bulk-flush.max-size |
否 |
2mb |
MemorySize |
每个批量请求的缓冲操作的内存中的最大大小。必须是MB粒度。可以设置'0'为禁用它。 |
sink.bulk-flush.interval |
否 |
1s |
Duration |
刷新缓冲操作的间隔。可以设置'0'为禁用它。 请注意: 'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions' 都可以设置为'0'刷新间隔,从而允许对缓冲操作进行完整的异步处理。 |
sink.bulk-flush.backoff.strategy |
否 |
DISABLED |
String |
指定在任何刷新操作由于临时请求错误而失败时如何执行重试。有效的策略是:
|
sink.bulk-flush.backoff.max-retries |
否 |
8 |
Integer |
最大退避重试次数。 |
sink.bulk-flush.backoff.delay |
否 |
50ms |
Duration |
每次退避尝试之间的延迟。 对于CONSTANT退避,这只是每次重试之间的延迟。 对于EXPONENTIAL退避,这是初始基本延迟。 |
connection.max-retry-timeout |
否 |
无 |
Duration |
重试之间的最大超时时间。 |
connection.path-prefix |
否 |
无 |
String |
要添加到每个REST通信的前缀字符串,例如, '/v1'。 |
format |
否 |
json |
String |
Elasticsearch连接器支持指定格式。该格式必须生成有效的 json 文档。默认情况下使用内置'json'格式。 请参考Format页面以获取更多详细信息和格式参数。 |
pwd_auth_name |
否 |
无 |
String |
Password类型的跨源认证名称。
|
es_auth_name |
否 |
无 |
String |
CSS类型的跨源认证的名称。
|
示例
该示例是从Kafka数据源中读取数据,并写入到Elasticsearch结果表中,其具体步骤如下:
- 参考增强型跨源连接,在DLI上根据Elasticsearch和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
- 设置Elasticsearch和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Elasticsearch和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
- 登录Elasticsearch集群的Kibana,并选择Dev Tools,输入下列语句并执行,以创建值为orders的index:
PUT /orders { "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } } }
- 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。
注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'ElasticsearchAddress:ElasticsearchPort', 'index' = 'orders' ); insert into elasticsearchSink select * from kafkaSource;
- 连接Kafka集群,向kafka中插入如下测试数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- 在Elasticsearch集群的Kibana中输入下述语句并查看相应结果:
GET orders/_search
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "orders", "_type" : "_doc", "_id" : "ae7wpH4B1dV9conjpXeB", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "orders", "_type" : "_doc", "_id" : "au7xpH4B1dV9conjn3er", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } } ] } }