Obtaining Temporary Credentials from a Flink Job's Agency for Accessing Other Cloud Services
Function
DLI provides a common interface to obtain temporary credentials from Flink job's agencies set by users during job launch. The interface encapsulates temporary credentials obtained from the job agency in the com.huaweicloud.sdk.core.auth.BasicCredentials class.
- Encapsulate temporary credentials obtained from the agency in the return value of getCredentials() of the com.huaweicloud.sdk.core.auth.ICredentialProvider interface.
- The return type is com.huaweicloud.sdk.core.auth.BasicCredentials.
- Only AKs, SKs, and security tokens can be obtained.
- After obtaining the AK, SK, and security token, query temporary credentials by referring to Using CSMS to Change Hard-coded Database Account Passwords.
Notes and Constraints
- Agency authorization for accessing temporary credentials is only supported in Flink 1.15.
- When creating a Flink job, select version 1.15.
- The information of the agency that allows DLI to access DEW has been configured for the job. flink.dli.job.agency.name indicates the custom agency name.
For details about how to create a custom agency, see Customizing DLI Agency Permissions.
Note that double quotes ("") or single quotes ('') are not required when configuring parameters.
- The Flink 1.15 basic image has built-in huaweicloud-sdk-core 3.1.62.
Preparing the Environment
Development tools such as IntelliJ IDEA and other development tools, JDK, and Maven have been installed and configured.
Dependency package in POM file configurations
<dependency> <groupId>com.huaweicloud.sdk</groupId> <artifactId>huaweicloud-sdk-core</artifactId> <version>3.1.62</version> <scope>provided</scope> </dependency>
Sample Code
This section's Java sample code demonstrates how to obtain BasicCredentials and retrieve a temporary agency's AK, SK, and security token.
- Obtaining job agency credentials using Flink UDFs
package com.huawei.dli.demo; import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.auth.ICredentialProvider; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GetUserAgencyCredentialUDF extends ScalarFunction { private static final Logger LOG = LoggerFactory.getLogger(GetUserAgencyCredentialUDF.class); ICredentialProvider credentialProvider; @Override public void open(FunctionContext context) throws Exception { credentialProvider = getICredentialProvider(); } public String eval(String value) { BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials(); String ak = basicCredentials.getAk(); String sk = basicCredentials.getSk(); String securityToken = basicCredentials.getSecurityToken(); LOG.info(">>> ak " + ak + " sk " + sk.length() + " token " + securityToken.length()); return value + "_demo"; } }
- Obtaining job agency credentials for Flink Jar jobs
package com.huawei.dli.demo; import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.auth.ICredentialProvider; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GetUserCredentialsFlinkStream { private static final Logger LOG = LoggerFactory.getLogger(GetUserCredentialsFlinkStream.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream = streamEnv.addSource(new DataGen()).disableChaining(); ICredentialProvider credentialProvider = getICredentialProvider(); BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials(); String ak = basicCredentials.getAk(); String sk = basicCredentials.getSk(); String securityToken = basicCredentials.getSecurityToken(); LOG.info(">>" + " ak " + ak + " sk " + sk.length() + " token " + securityToken.length()); stream.print(); streamEnv.execute("GetUserCredentialsFlinkStream"); } }
- Tool class for obtaining job agencies
package com.huawei.dli.demo.utils; import com.huaweicloud.sdk.core.auth.ICredentialProvider; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; public class DLIJobAgencyCredentialUtils { public static ICredentialProvider getICredentialProvider() { List<ICredentialProvider> credentialProviders = new ArrayList<>(); ServiceLoader.load(ICredentialProvider.class, StreamExecutionEnvironment.class.getClassLoader()) .iterator() .forEachRemaining(credentialProviders::add); if (credentialProviders.size() != 1) { throw new RuntimeException("Failed to obtain temporary user credential"); } return credentialProviders.get(0); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot