Flink Jar 使用DEW获取访问凭证读写OBS
操作场景
DLI将Flink Jar作业的输出数据写入到OBS时,需要配置AKSK访问OBS,为了确保AKSK数据安全,您可以用过数据加密服务(Data Encryption Workshop,DEW)、云凭据管理服务(Cloud Secret Management Service,CSMS),对AKSK统一管理,有效避免程序硬编码或明文配置等问题导致的敏感信息泄露以及权限失控带来的业务风险。
本例以获取访问OBS的AKSK为例介绍Flink Jar使用DEW获取访问凭证读写OBS的操作指导。
前提条件
- 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考:创建通用凭据。
- 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限:
- DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。
- DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。
- DEW解密凭据的权限,kms:dek:decrypt。
委托权限示例请参考自定义DLI委托权限和常见场景的委托权限策略。
- 仅支持Flink1.15版本使用DEW管理访问凭据,在创建作业时,请配置作业使用Flink1.15版本、且已在作业中配置允许DLI访问DEW的委托信息。自定义委托及配置请参考自定义DLI委托权限。
- 使用该功能,所有涉及OBS的桶,都需要进行配置AKSK。
语法格式
在Flink jar作业编辑界面,选择配置优化参数,配置信息如下:
不同的OBS桶,使用不同的AKSK认证信息。 可以使用如下配置方式,根据桶指定不同的AKSK信息,参数说明详见表1。
flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_CSMS_KEY flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.csms.secretName=CredentialName flink.hadoop.fs.dew.endpoint=ENDPOINT flink.hadoop.fs.dew.csms.version=VERSION_ID flink.hadoop.fs.dew.csms.cache.time.second=CACHE_TIME flink.dli.job.agency.name=USER_AGENCY_NAME
参数说明
参数 |
是否必选 |
默认值 |
数据类型 |
参数说明 |
---|---|---|---|---|
flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key |
是 |
无 |
String |
USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。 参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的AK(Access Key Id),需要具备访问OBS对应桶的权限。 |
flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key |
是 |
无 |
String |
USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。 参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的SK(Secret Access Key),需要具备访问OBS对应桶的权限。 |
flink.hadoop.fs.obs.security.provider |
是 |
无 |
String |
OBS AKSK认证机制,使用DEW服务中的CSMS凭证管理,获取OBS的AK、SK。 默认取值为com.dli.provider.UserObsBasicCredentialProvider |
flink.hadoop.fs.dew.endpoint |
是 |
无 |
String |
指定要使用的DEW服务所在的endpoint信息。 获取地区和终端节点。 配置示例:flink.hadoop.fs.dew.endpoint=kms.cn-xxxx.myhuaweicloud.com |
flink.hadoop.fs.dew.projectId |
否 |
有 |
String |
DEW所在的项目ID, 默认是Flink作业所在的项目ID。 |
flink.hadoop.fs.dew.csms.secretName |
是 |
无 |
String |
在DEW服务的凭据管理中新建的通用凭据的名称。 配置示例:flink.hadoop.fs.dew.csms.secretName=secretInfo |
flink.hadoop.fs.dew.csms.version |
否 |
最新的version |
String |
在DEW服务的凭据管理中新建的通用凭据的版本号(凭据的版本标识符)。 若不指定,则默认获取该通用凭证的最新版本号。 配置示例:flink.hadoop.fs.dew.csms.version=v1 |
flink.hadoop.fs.dew.csms.cache.time.second |
否 |
3600 |
Long |
Flink作业访问获取CSMS通用凭证后,缓存的时间。 单位为秒。默认值为3600秒。 |
flink.dli.job.agency.name |
是 |
- |
String |
自定义委托名称。 |
样例代码
本章节JAVA样例代码演示将DataGen数据处理后写入到OBS,具体参数配置请根据实际环境修改。
- 创建DLI访问DEW的委托并完成委托授权。详细步骤请参考自定义DLI委托权限。
- 在DEW创建通用凭证。具体操作请参考:创建通用凭据。
- 登录DEW管理控制台
- 选择“凭据管理”,进入“凭据管理”页面。
- 单击“创建凭据”。配置凭据基本信息
- DLI Flink jar作业编辑界面设置作业参数。
- 类名
com.dli.demo.dew.DataGen2FileSystemSink
- 参数
--checkpoint.path obs://test/flink/jobs/checkpoint/120891/ --output.path obs://dli/flink.db/79914/DataGen2FileSystemSink
- 优化参数:
flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_CSMS_KEY flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.csms.secretName=obsAksK flink.hadoop.fs.dew.endpoint=kmsendpoint flink.hadoop.fs.dew.csms.version=v6 flink.hadoop.fs.dew.csms.cache.time.second=3600 flink.dli.job.agency.name=***
- 类名
- Flink Jar作业示例。
- 环境准备
已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。
pom文件配置中依赖包
<properties> <flink.version>1.15.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- fastjson --> <dependency> <artifactId>fastjson</artifactId> <version>2.0.15</version> </dependency> </dependencies>
- 示例代码
package com.huawei.dli.demo.dew; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Random; public class DataGen2FileSystemSink { private static final Logger LOG = LoggerFactory.getLogger(DataGen2FileSystemSink.class); public static void main(String[] args) { ParameterTool params = ParameterTool.fromArgs(args); LOG.info("Params: " + params.toString()); try { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // set checkpoint String checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint/jobId_jobName/"); LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(System.currentTimeMillis() / 1000, 0, ZoneOffset.ofHours(8)); String dt = localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd_HH:mm:ss")); checkpointPath = checkpointPath + dt; streamEnv.setStateBackend(new EmbeddedRocksDBStateBackend()); streamEnv.getCheckpointConfig().setCheckpointStorage(checkpointPath); streamEnv.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); streamEnv.enableCheckpointing(30 * 1000); DataStream<String> stream = streamEnv.addSource(new DataGen()) .setParallelism(1) .disableChaining(); String outputPath = params.get("output.path", "obs://bucket/outputPath/jobId_jobName"); // Sink OBS final StreamingFileSink<String> sinkForRow = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); stream.addSink(sinkForRow); streamEnv.execute("sinkForRow"); } catch (Exception e) { LOG.error(e.getMessage(), e); } } } class DataGen implements ParallelSourceFunction<String> { private boolean isRunning = true; private Random random = new Random(); @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { JSONObject jsonObject = new JSONObject(); jsonObject.put("id", random.nextLong()); jsonObject.put("name", "Molly" + random.nextInt()); jsonObject.put("address", "hangzhou" + random.nextInt()); jsonObject.put("birthday", System.currentTimeMillis()); jsonObject.put("city", "hangzhou" + random.nextInt()); jsonObject.put("number", random.nextInt()); ctx.collect(jsonObject.toJSONString()); Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
- 环境准备