Flink消费
云日志服务LTS提供的LTS-Flink-Connector工具用于对接flink,包括消费者(Consumer)和生产者(Producer)两部分。
消费者用于从云日志服务中读取数据,支持exactly once语义,支持shard负载均衡,生产者用于将数据写入云日志服务,使用LTS-Flink-Connector工具时,需要在项目中添加maven依赖:
<dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-flink-connector</artifactId> <version>1.0.3.1</version> </dependency>
目前此功能在邀测中,暂不支持申请开通。
前提条件
FlinkLtsLogConsumer
FlinkLtsLogConsumer提供了订阅云日志服务中某一个日志组中单个或者多个日志流的能力,支持Exactly Once语义,在使用时,用户无需关心日志流中shard数量的变化,FlinkLtsLogConsumer会自动感知。
Flink中每一个subtask启动一个LTS SDK的consumer worker负责消费日志流中部分shard,如果日志流中shard发生split或者merge,consumer worker消费的shard也会随之改变。
- 本地调试配置启动参数。
以下是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。
Properties configProps = new Properties(); // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全 // LTS 日志服务所属region configProps.put(ConfigConstants.LOG_REGION_NAME, REGION_NAME); // LTS 日志租户项目ID configProps.put(ConfigConstants.LOG_PROJECT_ID, LOG_PROJECT_ID); // LTS 日志组ID configProps.put(ConfigConstants.LOG_GROUP_ID, LOG_GROUP_ID); // LTS 日志流ID configProps.put(ConfigConstants.LOG_STREAM_ID, LOG_STREAM_ID); // LTS 租户AK configProps.put(ConfigConstants.ACCESS_KEY_ID, ACCESS_KEY_ID); // LTS 租户SK configProps.put(ConfigConstants.ACCESS_KEY_SECRET, ACCESS_KEY_SECRET); // LTS 日志流的消费组 configProps.put(ConfigConstants.CONSUMER_GROUP_NAME, CONSUMER_GROUP); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10"); configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000"); // LTS 日志消费结束位置 configProps.put(ConfigConstants.STOP_TIME, ""); // LTS 日志消费起始位置 configProps.put(ConfigConstants.START_TIME, 1689836602157000000L); LogDataDeserializer deserializer = new LogDataDeserializer(); DataStream<LogDataList> stream = env.addSource( new FlinkLtsLogConsumer<>(LOG_STREAM_ID, deserializer, configProps));
- 监控消费进度(可选)
FlinkLtsLogConsumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细操作可以参考文档创建消费组。
FlinkLtsLogProducer
FlinkLtsLogProducer用于将数据写到云日志服务中。
producer只支持Flink at-least-once语义,在发生作业失败的情况下,写入云日志服务中的数据有可能会重复,但是绝对不会丢失。
- 用法示例如下,我们将模拟产生的字符串写入云日志服务:
// 将数据序列化成日志服务的数据格式 return row -> { Map<String, Object> contentMap = new HashMap<>(); // 把flink SQL指定的列数据放到一个map for (int i = 0; i < rowArity; i++) { contentMap.put(fieldNames[i], fieldConverters[i].convert(row, i)); } LOGGER.debug("row data convert to lts log, data is {}", contentMap); // logTimeNs是必填字段,获取当前时间 long logTimeNs = System.currentTimeMillis() * 1000000L + System.nanoTime() % 1000000L; LogContent logContent = new LogContent(); logContent.setLog(contentMap.toString()); logContent.setLogTimeNs(logTimeNs); return logContent; }; public class FlinkProducerTest { private static final String REGION_NAME = "region name"; private static final String LOG_PROJECT_ID = "**************"; private static final String LOG_GROUP_ID = "**************"; private static final String LOG_STREAM_ID = "**************"; private static final String ACCESS_KEY_ID = "**************"; private static final String ACCESS_KEY_SECRET = "**************"; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); DataStream<String> stream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // LTS 日志服务所属region configProps.put(ConfigConstants.LOG_REGION_NAME, REGION_NAME); // LTS 日志租户项目ID configProps.put(ConfigConstants.LOG_PROJECT_ID, LOG_PROJECT_ID); // LTS 日志组ID configProps.put(ConfigConstants.LOG_GROUP_ID, LOG_GROUP_ID); // LTS 日志流ID configProps.put(ConfigConstants.LOG_STREAM_ID, LOG_STREAM_ID); // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全 // LTS 租户AK configProps.put(ConfigConstants.ACCESS_KEY_ID, ACCESS_KEY_ID); // LTS 租户SK configProps.put(ConfigConstants.ACCESS_KEY_SECRET, ACCESS_KEY_SECRET); LogDataDeserializer deserializer = new LogDataDeserializer(); FlinkLtsLogProducer<String> logProducer = new FlinkLtsLogProducer<>(new SimpleLogSerializer(), configProps); stream.addSink(logProducer); env.execute("flink log producer"); } public static class EventsGenerator implements SourceFunction<String> { private volatile boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
- 初始化,Producer初始化主要是初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可。
DynamicLtsTableFactory支持SQL作业
LtsDynamicSource和LtsDynamicSink table作业,支持LTS日志直接接入flink,支持SQL语法,示例如下:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings); // enable checkpointing Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.set( ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); configuration.set( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)); tableEnv.executeSql("CREATE TABLE source ( " + " collectTime varchar, " + " lineNum varchar, " + " podName varchar, " + " pathFile varchar, " + " category varchar " + " ) " + " with ( " + // connector 表类型 固定值lts "'connector' = 'lts', " + // LTS 日志服务所属region "'regionName' = 'cn-north-7', " + // LTS 日志租户项目ID "'projectId' = '**************', " + // 注意:认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全 // LTS 租户AK "'accessKey' = '**************', " + // LTS 租户SK "'accessSecret' = '**************', " + // LTS 日志组ID "'logGroupId' = '**************', " + // LTS 日志流ID "'logStreamId' = '**************', " + // LTS 日志流的消费组 "'consumerGroup' = '**************', " + // LTS 日志消费起始位置 "'startTime' = '1689836602157000000', " + // LTS 原始日志是否做JSON解析 "'jsonParse' = 'true' " + " )"); tableEnv.executeSql("CREATE TABLE print_sink ( " + " collectTime varchar, " + " lineNum varchar, " + " podName varchar, " + " pathFile varchar, " + " category varchar " + " ) " + " with ( " + // connector 表类型 固定值lts "'connector' = 'lts', " + // LTS 日志服务所属region "'regionName' = 'cn-north-7', " + // LTS 日志租户项目ID "'projectId' = '2a473356cca5487f8373be891bffc1cf', " + // LTS 租户AK "'accessKey' = 'DADYWPUP8JMUV3UGPEI9', " + // LTS 租户SK "'accessSecret' = 'jUtvcc0oIIcGZGoAUvtlSi8Oz6sZdFI2ZqFKBGUZ', " + // LTS 日志组ID "'logGroupId' = 'e83e94db-2e29-49c9-ae15-d3a9f4c3ea1b', " + // LTS 日志流ID "'logStreamId' = '0a423cfc-dbf8-4cf3-8fb9-f5cf95fa1298' " + " )"); tableEnv.executeSql("insert into print_sink select * from source "); }