获取Flink作业委托临时凭证用于访问其他云服务
功能描述
DLI提供了一个通用接口,可用于获取用户在启动Flink作业时设置的委托的临时凭证。该接口将获取到的该作业委托的临时凭证封装到com.huaweicloud.sdk.core.auth.BasicCredentials类中。
- 获取到的委托的临时认证封装到com.huaweicloud.sdk.core.auth.ICredentialProvider接口的getCredentials()返回值中。
- 返回类型为com.huaweicloud.sdk.core.auth.BasicCredentials。
- 仅支持获取AK、SK、SecurityToken。
- 获取到AK、SK、SecurityToken后,请参考如何使用凭据管理服务替换硬编码的数据库账号密码查询凭据。
约束限制
- 仅支持Flink1.15版本使用委托授权访问临时凭证:
- 在创建作业时,请配置作业使用Flink1.15版本
- 已在作业中配置允许DLI访问DEW的委托信息。flink.dli.job.agency.name=自定义委托名称。
自定义委托请参考自定义DLI委托权限。
请注意配置参数不需要用"" 或 '' 包裹。
- Flink1.15基础镜内已内置了3.1.62版本的huaweicloud-sdk-core,无需重复安装。
准备环境
已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。
Maven工程的pom.xml文件配置请参考JAVA样例代码中“pom文件配置”说明。
pom文件配置中依赖包
1 2 3 4 5 6 7 |
<dependency> <groupId>com.huaweicloud.sdk</groupId> <artifactId>huaweicloud-sdk-core</artifactId> <version>3.1.62</version> <scope>provided</scope> </dependency> |
示例代码
本章节JAVA样例代码演示如何获取BasicCredentials,以及取临时委托的AK、SK、SecurityToken。
- Flink UDF 获取作业委托凭证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
package com.huawei.dli.demo; import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.auth.ICredentialProvider; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GetUserAgencyCredentialUDF extends ScalarFunction { private static final Logger LOG = LoggerFactory.getLogger(GetUserAgencyCredentialUDF.class); ICredentialProvider credentialProvider; @Override public void open(FunctionContext context) throws Exception { credentialProvider = getICredentialProvider(); } public String eval(String value) { BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials(); String ak = basicCredentials.getAk(); String sk = basicCredentials.getSk(); String securityToken = basicCredentials.getSecurityToken(); LOG.info(">>> ak " + ak + " sk " + sk.length() + " token " + securityToken.length()); return value + "_demo"; } }
- Flink Jar作业获取作业委托凭证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
package com.huawei.dli.demo; import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.auth.ICredentialProvider; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; public class GetUserCredentialsFlinkStream { private static final Logger LOG = LoggerFactory.getLogger(GetUserCredentialsFlinkStream.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream = streamEnv.addSource(new DataGen()).disableChaining(); ICredentialProvider credentialProvider = getICredentialProvider(); BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials(); String ak = basicCredentials.getAk(); String sk = basicCredentials.getSk(); String securityToken = basicCredentials.getSecurityToken(); LOG.info(">>" + " ak " + ak + " sk " + sk.length() + " token " + securityToken.length()); stream.print(); streamEnv.execute("GetUserCredentialsFlinkStream"); } private static class DataGen implements ParallelSourceFunction<String> { private boolean isRunning = true; private int count = 0; public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { for (long i = 0; i < 10; i++) { ctx.collect("data-" + count); count++; } Thread.sleep(1000); } } public void cancel() { isRunning = false; } } }
- 获取作业委托的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
package com.huawei.dli.demo.utils; import com.huaweicloud.sdk.core.auth.ICredentialProvider; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; public class DLIJobAgencyCredentialUtils { public static ICredentialProvider getICredentialProvider() { List<ICredentialProvider> credentialProviders = new ArrayList<>(); ServiceLoader.load(ICredentialProvider.class, StreamExecutionEnvironment.class.getClassLoader()) .iterator() .forEachRemaining(credentialProviders::add); if (credentialProviders.size() != 1) { throw new RuntimeException("Failed to obtain temporary user credential"); } return credentialProviders.get(0); } }