LTS-Flink-Connector参数说明
云日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。
LTS-Flink-Connector工具支持以下表1。
使用限制
- 云日志服务提供的lts-flink-connector工具版本在1.0.4及之后支持flink1.15版本,1.0.4之前的版本支持flink1.12版本。
- LTS服务不支持作为维表。
- lts-flink-connector仅保证At-Least-Once语义。
- lts-flink-connector source和sink日志不能为同一个日志流。
- flink消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
语法结构
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
CREATE TABLE source ( field1 INT, field2 INT, field3 VARCHAR )with ( 'connector' = 'lts', 'regionName' = '<yourRegionName>', 'projectId' = '<yourProjectId>', 'accessKey' = '<yourAk>', 'accessSecret' = '<yourSk>', 'logGroupId' = '<yourLogGroupId>', 'logStreamId' = '<yourLogStreamId>', 'consumerGroup' = '<yourConsumerGroup>', 'startTime' = '<consumerStartTime>' );
WITH参数
- 通用参数
表2 通用参数说明 参数名称
描述
类型
是否必填
默认值
connector
表类型。
String
必填
lts
regionName
云日志服务的区域
String
必填
-
projectId
华为云账号的项目ID(project id)
String
必填
-
logGroupId
LTS的日志组ID
String
必填
-
logStreamId
LTS的日志流ID
String
必填
-
logGroupName
LTS的日志组名称
String
选填
日志组ID和名称不能同时为空。
logStreamName
LTS的日志流名称
String
选填
日志流ID和名称不能同时为空。
accessKey
华为云账号的AK
String
必填
-
accessSecret
华为云账号的SK
String
必填
-
consumerGroup
LTS日志流对应的消费组名称
String
必填
-
startTime
消费开始时间:ns时间戳(指定时间消费)、begin_cursor(从头开始消费)、end_cursor(从最新数据开始消费)
String
必填
-
stopTime
ns时间戳 (消费结束时间)
Long
选填
-
jsonParse
是否对原始日志做json解析
Boolean
选填
false
enableLocalTest
是否开启跨云消费日志(flink-connector版本最低必须为1.0.7支持此功能)。此功能仅适用于开发调测,对于消费性能不做保障。
- 选true时,开启跨云后用户能通过公网(华东-上海一)访问。
- 选false时,关闭跨云后用户是通过华为云当前区域主机访问。
boolean
选填
false
historyQuery
是否消费历史数据(flink-connector版本最低必须为1.0.7支持此功能)。
- 选true时,可以消费开启日志流引擎之前的数据。
- 选false时,只能消费当开启了日志流引擎之后的数据。
boolean
选填
false
proxyIp
使用自定义ip端口上报日志(flink-connector版本最低必须为1.1.0支持此功能)。
String
选填
-
timeField
指定字段名,该字段的值会覆盖__time__属性字段的值,表示日志写入时间。(flink-connector版本最低必须为1.1.3支持此功能)
String
选填
该参数值是表中已存在的字段之一,且字段必须为ms时间戳的INT类型。如果未指定,则默认填充当前时间。
类型映射
Flink字段类型 |
LTS字段类型 |
---|---|
VARCHAR |
STRING |
代码示例
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
CREATE TABLE source ( filed1 varchar, filed2 varchar, filed3 varchar )with ( 'connector' = 'lts', 'regionName' = 'cn-north-4', 'projectId' = '{projectId}', 'accessKey' = '{ak}', 'accessSecret' = '{sk}', 'logGroupId' = '{groupId}', 'logStreamId' = '{streamId}', 'consumerGroup' = '{consumerGroup}', 'startTime' = '1689836602157000000', 'jsonParse' = 'false' ); CREATE TABLE print_sink ( filed1 varchar, filed2 varchar, filed3 varchar )with ( 'connector' = 'lts', 'regionName' = 'cn-north-4', 'projectId' = '{projectId}', 'accessKey' = '{ak}', 'accessSecret' = '{sk}', 'logGroupId' = '{groupId}', 'logStreamId' = '{streamId}' ); insert into print_sink select * from source;