文档首页/ 数据湖探索 DLI/ 开发指南/ Flink作业委托场景开发指导/ Flink Jar 使用DEW获取访问凭证读写OBS
更新时间:2024-09-19 GMT+08:00

Flink Jar 使用DEW获取访问凭证读写OBS

操作场景

DLI将Flink Jar作业的输出数据写入到OBS时,需要配置AKSK访问OBS,为了确保AKSK数据安全,您可以用过数据加密服务(Data Encryption Workshop,DEW)、云凭据管理服务(Cloud Secret Management Service,CSMS),对AKSK统一管理,有效避免程序硬编码或明文配置等问题导致的敏感信息泄露以及权限失控带来的业务风险。

本例以获取访问OBS的AKSK为例介绍Flink Jar使用DEW获取访问凭证读写OBS的操作指导。

前提条件

  • 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考:创建通用凭据
  • 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限:
    • DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。
    • DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。
    • DEW解密凭据的权限,kms:dek:decrypt。

    委托权限示例请参考自定义DLI委托权限常见场景的委托权限策略

  • 仅支持Flink1.15版本使用DEW管理访问凭据,在创建作业时,请配置作业使用Flink1.15版本、且已在作业中配置允许DLI访问DEW的委托信息。自定义委托及配置请参考自定义DLI委托权限
  • 使用该功能,所有涉及OBS的桶,都需要进行配置AKSK。

语法格式

在Flink jar作业编辑界面,选择配置优化参数,配置信息如下:

不同的OBS桶,使用不同的AKSK认证信息。 可以使用如下配置方式,根据桶指定不同的AKSK信息,参数说明详见表1

flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_CSMS_KEY
flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY
flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider
flink.hadoop.fs.dew.csms.secretName=CredentialName
flink.hadoop.fs.dew.endpoint=ENDPOINT
flink.hadoop.fs.dew.csms.version=VERSION_ID
flink.hadoop.fs.dew.csms.cache.time.second=CACHE_TIME
flink.dli.job.agency.name=USER_AGENCY_NAME

参数说明

表1 参数说明

参数

是否必选

默认值

数据类型

参数说明

flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key

String

USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。

参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的AK(Access Key Id),需要具备访问OBS对应桶的权限。

flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key

String

USER_BUCKET_NAME为用户的桶名,需要进行替换为用户的使用的OBS桶名。

参数的值为用户定义在CSMS通用凭证中的键key, 其Key对应的value为用户的SK(Secret Access Key),需要具备访问OBS对应桶的权限。

flink.hadoop.fs.obs.security.provider

String

OBS AKSK认证机制,使用DEW服务中的CSMS凭证管理,获取OBS的AK、SK。

默认取值为com.dli.provider.UserObsBasicCredentialProvider

flink.hadoop.fs.dew.endpoint

String

指定要使用的DEW服务所在的endpoint信息。

获取地区和终端节点

配置示例:flink.hadoop.fs.dew.endpoint=kms.cn-xxxx.myhuaweicloud.com

flink.hadoop.fs.dew.projectId

String

DEW所在的项目ID, 默认是Flink作业所在的项目ID。

获取项目ID

flink.hadoop.fs.dew.csms.secretName

String

在DEW服务的凭据管理中新建的通用凭据的名称。

配置示例:flink.hadoop.fs.dew.csms.secretName=secretInfo

flink.hadoop.fs.dew.csms.version

最新的version

String

在DEW服务的凭据管理中新建的通用凭据的版本号(凭据的版本标识符)。

若不指定,则默认获取该通用凭证的最新版本号。

配置示例:flink.hadoop.fs.dew.csms.version=v1

flink.hadoop.fs.dew.csms.cache.time.second

3600

Long

Flink作业访问获取CSMS通用凭证后,缓存的时间。

单位为秒。默认值为3600秒。

flink.dli.job.agency.name

-

String

自定义委托名称。

样例代码

本章节JAVA样例代码演示将DataGen数据处理后写入到OBS,具体参数配置请根据实际环境修改。

  1. 创建DLI访问DEW的委托并完成委托授权。详细步骤请参考自定义DLI委托权限
  2. 在DEW创建通用凭证。具体操作请参考:创建通用凭据
    1. 登录DEW管理控制台
    2. 选择“凭据管理”,进入“凭据管理”页面。
    3. 单击“创建凭据”。配置凭据基本信息
  3. DLI Flink jar作业编辑界面设置作业参数。
    • 类名
      com.dli.demo.dew.DataGen2FileSystemSink
    • 参数
      --checkpoint.path obs://test/flink/jobs/checkpoint/120891/ 
      --output.path obs://dli/flink.db/79914/DataGen2FileSystemSink
    • 优化参数:
      flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_CSMS_KEY
      flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY
      flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider
      flink.hadoop.fs.dew.csms.secretName=obsAksK
      flink.hadoop.fs.dew.endpoint=kmsendpoint
      flink.hadoop.fs.dew.csms.version=v6
      flink.hadoop.fs.dew.csms.cache.time.second=3600
      flink.dli.job.agency.name=***
  4. Flink Jar作业示例。
    • 环境准备

      已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。

      pom文件配置中依赖包

       <properties>
              <flink.version>1.15.0</flink.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-statebackend-rocksdb</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
      
              <!-- fastjson -->
              <dependency>
                  <artifactId>fastjson</artifactId>
                  <version>2.0.15</version>
              </dependency>
          </dependencies>
    • 示例代码
      package com.huawei.dli.demo.dew;
      
      import org.apache.flink.api.common.serialization.SimpleStringEncoder;
      import org.apache.flink.api.java.utils.ParameterTool;
      import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
      import org.apache.flink.core.fs.Path;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.CheckpointConfig;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
      import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
      import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.time.LocalDateTime;
      import java.time.ZoneOffset;
      import java.time.format.DateTimeFormatter;
      import java.util.Random;
      
      public class DataGen2FileSystemSink {
          private static final Logger LOG = LoggerFactory.getLogger(DataGen2FileSystemSink.class);
      
          public static void main(String[] args) {
              ParameterTool params = ParameterTool.fromArgs(args);
              LOG.info("Params: " + params.toString());
              try {
                  StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      
                  // set checkpoint
                  String checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint/jobId_jobName/");
                  LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(System.currentTimeMillis() / 1000,
                      0, ZoneOffset.ofHours(8));
                  String dt = localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd_HH:mm:ss"));
                  checkpointPath = checkpointPath + dt;
      
                  streamEnv.setStateBackend(new EmbeddedRocksDBStateBackend());
                  streamEnv.getCheckpointConfig().setCheckpointStorage(checkpointPath);
                  streamEnv.getCheckpointConfig().setExternalizedCheckpointCleanup(
                      CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                  streamEnv.enableCheckpointing(30 * 1000);
      
                  DataStream<String> stream = streamEnv.addSource(new DataGen())
                      .setParallelism(1)
                      .disableChaining();
      
                  String outputPath = params.get("output.path", "obs://bucket/outputPath/jobId_jobName");
      
                  // Sink OBS
                  final StreamingFileSink<String> sinkForRow = StreamingFileSink
                      .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                      .withRollingPolicy(OnCheckpointRollingPolicy.build())
                      .build();
      
                  stream.addSink(sinkForRow);
      
                  streamEnv.execute("sinkForRow");
              } catch (Exception e) {
                  LOG.error(e.getMessage(), e);
              }
          }
      }
      
      class DataGen implements ParallelSourceFunction<String> {
      
          private boolean isRunning = true;
      
          private Random random = new Random();
      
          @Override
          public void run(SourceContext<String> ctx) throws Exception {
              while (isRunning) {
                  JSONObject jsonObject = new JSONObject();
                  jsonObject.put("id", random.nextLong());
                  jsonObject.put("name", "Molly" + random.nextInt());
                  jsonObject.put("address", "hangzhou" + random.nextInt());
                  jsonObject.put("birthday", System.currentTimeMillis());
                  jsonObject.put("city", "hangzhou" + random.nextInt());
                  jsonObject.put("number", random.nextInt());
                  ctx.collect(jsonObject.toJSONString());
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void cancel() {
              isRunning = false;
          }
      }