更新时间:2024-08-23 GMT+08:00
分享

LTS-Flink-Connector参数说明

云日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。

LTS-Flink-Connector工具支持以下表1

表1 功能说明

功能

说明

支持类型

源表和结果表

运行模式

仅支持流模式

API种类

SQL

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

使用限制

  • 仅flink1.12版本支持云日志服务提供的lts-flink-connector工具。
  • LTS服务不支持作为维表。
  • lts-flink-connector仅保证At-Least-Once语义。
  • lts-flink-connector source和sink日志不能为同一个日志流。
  • flink消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。

语法结构

认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

CREATE TABLE source (
   field1 INT,
   field2 INT,
   field3 VARCHAR

)with (
        'connector' = 'lts',
        'regionName' = '<yourRegionName>',
        'projectId' = '<yourProjectId>',
        'accessKey' = '<yourAk>',
        'accessSecret' = '<yourSk>',
        'logGroupId' = '<yourLogGroupId>',
        'logStreamId' = '<yourLogStreamId>',
        'consumerGroup' = '<yourConsumerGroup>',
        'startTime' = '<consumerStartTime>',
        'jsonParse' = 'true'
);

WITH参数

  • 通用参数
    表2 通用参数说明

    参数名称

    描述

    类型

    是否必填

    默认值

    connector

    表类型。

    String

    必填

    lts

    regionName

    云日志服务的区域

    String

    必填

    -

    projectId

    华为云账号的项目ID(project id)

    String

    必填

    -

    logGroupId

    LTS的日志组ID

    String

    必填

    -

    logStreamId

    LTS的日志流ID

    String

    必填

    -

    accessKey

    华为云账号的AK

    String

    必填

    -

    accessSecret

    华为云账号的SK

    String

    必填

    -

    consumerGroup

    LTS日志流对应的消费组名称

    String

    必填

    -

    startTime

    消费开始时间,纳秒值

    Long

    必填

    -

    stopTime

    消费结束时间,纳秒值

    Long

    选填

    -

    jsonParse

    是否对原始日志做json解析

    Boolean

    选填

    false

类型映射

表3 类型映射

Flink字段类型

LTS字段类型

VARCHAR

STRING

代码示例

认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

CREATE TABLE source (
            filed1 varchar,
            filed2 varchar,
            filed3 varchar
)with (
        'connector' = 'lts',
        'regionName' = 'cn-north-4',
        'projectId' = '{projectId}',
        'accessKey' = '{ak}',
        'accessSecret' = '{sk}',
        'logGroupId' = '{groupId}',
        'logStreamId' = '{streamId}',
        'consumerGroup' = '{consumerGroup}',
        'startTime' = '1689836602157000000',
        'jsonParse' = 'true'
);


CREATE TABLE print_sink (
            filed1 varchar,
            filed2 varchar,
            filed3 varchar
)with (
            'connector' = 'lts',
            'regionName' = 'cn-north-4',
            'projectId' = '{projectId}',
            'accessKey' = '{ak}',
            'accessSecret' = '{sk}',
            'logGroupId' = '{groupId}',
            'logStreamId' = '{streamId}'
);

insert into print_sink select * from source;

相关文档