文档首页/ 数据湖探索 DLI/ 开发指南/ Flink作业委托场景开发指导/ 获取Flink作业委托临时凭证用于访问其他云服务
更新时间:2024-09-19 GMT+08:00

获取Flink作业委托临时凭证用于访问其他云服务

功能描述

DLI提供了一个通用接口,可用于获取用户在启动Flink作业时设置的委托的临时凭证。该接口将获取到的该作业委托的临时凭证封装到com.huaweicloud.sdk.core.auth.BasicCredentials类中。

  • 获取到的委托的临时认证封装到com.huaweicloud.sdk.core.auth.ICredentialProvider接口的getCredentials()返回值中。
  • 返回类型为com.huaweicloud.sdk.core.auth.BasicCredentials。
  • 仅支持获取AK、SK、SecurityToken。
  • 获取到AK、SK、SecurityToken后,请参考如何使用凭据管理服务替换硬编码的数据库账号密码查询凭据。

约束限制

  • 仅支持Flink1.15版本使用委托授权访问临时凭证:
    • 在创建作业时,请配置作业使用Flink1.15版本
    • 已在作业中配置允许DLI访问DEW的委托信息。flink.dli.job.agency.name=自定义委托名称

      自定义委托请参考自定义DLI委托权限

      请注意配置参数不需要用"" 或 '' 包裹。

  • Flink1.15基础镜像内置了3.1.62版本的huaweicloud-sdk-core。

准备环境

已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。

pom文件配置中依赖包

        <dependency>
            <groupId>com.huaweicloud.sdk</groupId>
            <artifactId>huaweicloud-sdk-core</artifactId>
            <version>3.1.62</version>
            <scope>provided</scope>
        </dependency>
   

示例代码

本章节JAVA样例代码演示如何获取BasicCredentials,以及取临时委托的AK、SK、SecurityToken。

  • Flink UDF 获取作业委托凭证
    package com.huawei.dli.demo;
    
    import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider;
    
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class GetUserAgencyCredentialUDF extends ScalarFunction {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserAgencyCredentialUDF.class);
    
        ICredentialProvider credentialProvider;
    
        @Override
        public void open(FunctionContext context) throws Exception {
            credentialProvider = getICredentialProvider();
        }
    
        public String eval(String value) {
            BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials();
    
            String ak = basicCredentials.getAk();
            String sk = basicCredentials.getSk();
            String securityToken = basicCredentials.getSecurityToken();
            LOG.info(">>> ak " + ak + " sk " + sk.length() + " token " + securityToken.length());
            return value + "_demo";
        }
    }
  • Flink Jar作业获取作业委托凭证
    package com.huawei.dli.demo;
    
    import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider;
    
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class GetUserCredentialsFlinkStream {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserCredentialsFlinkStream.class);
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<String> stream = streamEnv.addSource(new DataGen()).disableChaining();
            ICredentialProvider credentialProvider = getICredentialProvider();
    
            BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials();
            String ak = basicCredentials.getAk();
            String sk = basicCredentials.getSk();
            String securityToken = basicCredentials.getSecurityToken();
            LOG.info(">>" + " ak " + ak + " sk " + sk.length() + " token " + securityToken.length());
    
            stream.print();
            streamEnv.execute("GetUserCredentialsFlinkStream");
        }
    }
  • 获取作业委托的工具类
    package com.huawei.dli.demo.utils;
    
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.ServiceLoader;
    
    public class DLIJobAgencyCredentialUtils {
    
        public static ICredentialProvider getICredentialProvider() {
            List<ICredentialProvider> credentialProviders = new ArrayList<>();
            ServiceLoader.load(ICredentialProvider.class, StreamExecutionEnvironment.class.getClassLoader())
                .iterator()
                .forEachRemaining(credentialProviders::add);
    
            if (credentialProviders.size() != 1) {
                throw new RuntimeException("Failed to obtain temporary user credential");
            }
            return credentialProviders.get(0);
        }
    }