更新时间:2024-04-07 GMT+08:00

用户获取Flink作业委托临时凭证

功能描述

DLI提供了一个通用接口,可用于获取用户在启动Flink作业时设置的委托的临时凭证。该接口将获取到的该作业委托的临时凭证封装到com.huaweicloud.sdk.core.auth.BasicCredentials类中。

  • 获取到的委托的临时认证封装到com.huaweicloud.sdk.core.auth.ICredentialProvider接口的getCredentials()返回值中。
  • 返回类型为com.huaweicloud.sdk.core.auth.BasicCredentials。
  • 仅支持获取AK、SK、SecurityToken。

约束限制

  • 仅支持Flink1.15版本使用委托授权访问临时凭证:
    • 在创建作业时,请配置作业使用Flink1.15版本
    • 已在作业中配置允许DLI访问DEW的委托信息。flink.dli.job.agency.name=自定义委托名称

      自定义委托请参考自定义DLI委托权限

      请注意配置参数不需要用"" 或 '' 包裹。

  • Flink1.15基础镜像内置了3.1.62版本的huaweicloud-sdk-core。

准备环境

已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。

pom文件配置中依赖包

        <dependency>
            <groupId>com.huaweicloud.sdk</groupId>
            <artifactId>huaweicloud-sdk-core</artifactId>
            <version>3.1.62</version>
            <scope>provided</scope>
        </dependency>
   

示例代码

本章节JAVA样例代码演示如何获取BasicCredentials,以及取临时委托的AK、SK、SecurityToken。

  • Flink UDF 获取作业委托凭证
    package com.huawei.dli.demo;
    
    import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider;
    
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class GetUserAgencyCredentialUDF extends ScalarFunction {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserAgencyCredentialUDF.class);
    
        ICredentialProvider credentialProvider;
    
        @Override
        public void open(FunctionContext context) throws Exception {
            credentialProvider = getICredentialProvider();
        }
    
        public String eval(String value) {
            BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials();
    
            String ak = basicCredentials.getAk();
            String sk = basicCredentials.getSk();
            String securityToken = basicCredentials.getSecurityToken();
            LOG.info(">>> ak " + ak + " sk " + sk.length() + " token " + securityToken.length());
            return value + "_demo";
        }
    }
  • Flink Jar作业获取作业委托凭证
    package com.huawei.dli.demo;
    
    import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider;
    
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class GetUserCredentialsFlinkStream {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserCredentialsFlinkStream.class);
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<String> stream = streamEnv.addSource(new DataGen()).disableChaining();
            ICredentialProvider credentialProvider = getICredentialProvider();
    
            BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials();
            String ak = basicCredentials.getAk();
            String sk = basicCredentials.getSk();
            String securityToken = basicCredentials.getSecurityToken();
            LOG.info(">>" + " ak " + ak + " sk " + sk.length() + " token " + securityToken.length());
    
            stream.print();
            streamEnv.execute("GetUserCredentialsFlinkStream");
        }
    }
  • 获取作业委托的工具类
    package com.huawei.dli.demo.utils;
    
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.ServiceLoader;
    
    public class DLIJobAgencyCredentialUtils {
    
        public static ICredentialProvider getICredentialProvider() {
            List<ICredentialProvider> credentialProviders = new ArrayList<>();
            ServiceLoader.load(ICredentialProvider.class, StreamExecutionEnvironment.class.getClassLoader())
                .iterator()
                .forEachRemaining(credentialProviders::add);
    
            if (credentialProviders.size() != 1) {
                throw new RuntimeException("Failed to obtain temporary user credential");
            }
            return credentialProviders.get(0);
        }
    }