Help Center/ Data Lake Insight/ Developer Guide/ Flink Job Agencies/ Obtaining Temporary Credentials from a Flink Job's Agency for Accessing Other Cloud Services
Updated on 2024-09-20 GMT+08:00

Obtaining Temporary Credentials from a Flink Job's Agency for Accessing Other Cloud Services

Function

DLI provides a common interface to obtain temporary credentials from Flink job's agencies set by users during job launch. The interface encapsulates temporary credentials obtained from the job agency in the com.huaweicloud.sdk.core.auth.BasicCredentials class.

  • Encapsulate temporary credentials obtained from the agency in the return value of getCredentials() of the com.huaweicloud.sdk.core.auth.ICredentialProvider interface.
  • The return type is com.huaweicloud.sdk.core.auth.BasicCredentials.
  • Only AKs, SKs, and security tokens can be obtained.
  • After obtaining the AK, SK, and security token, query temporary credentials by referring to Using CSMS to Change Hard-coded Database Account Passwords.

Notes and Constraints

  • Agency authorization for accessing temporary credentials is only supported in Flink 1.15.
    • When creating a Flink job, select version 1.15.
    • The information of the agency that allows DLI to access DEW has been configured for the job. flink.dli.job.agency.name indicates the custom agency name.

      For details about how to create a custom agency, see Customizing DLI Agency Permissions.

      Note that double quotes ("") or single quotes ('') are not required when configuring parameters.

  • The Flink 1.15 basic image has built-in huaweicloud-sdk-core 3.1.62.

Preparing the Environment

Development tools such as IntelliJ IDEA and other development tools, JDK, and Maven have been installed and configured.

Dependency package in POM file configurations

        <dependency>
            <groupId>com.huaweicloud.sdk</groupId>
            <artifactId>huaweicloud-sdk-core</artifactId>
            <version>3.1.62</version>
            <scope>provided</scope>
        </dependency>
   

Sample Code

This section's Java sample code demonstrates how to obtain BasicCredentials and retrieve a temporary agency's AK, SK, and security token.

  • Obtaining job agency credentials using Flink UDFs
    package com.huawei.dli.demo;
    
    import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider;
    
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.table.functions.FunctionContext;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class GetUserAgencyCredentialUDF extends ScalarFunction {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserAgencyCredentialUDF.class);
    
        ICredentialProvider credentialProvider;
    
        @Override
        public void open(FunctionContext context) throws Exception {
            credentialProvider = getICredentialProvider();
        }
    
        public String eval(String value) {
            BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials();
    
            String ak = basicCredentials.getAk();
            String sk = basicCredentials.getSk();
            String securityToken = basicCredentials.getSecurityToken();
            LOG.info(">>> ak " + ak + " sk " + sk.length() + " token " + securityToken.length());
            return value + "_demo";
        }
    }
  • Obtaining job agency credentials for Flink Jar jobs
    package com.huawei.dli.demo;
    
    import static com.huawei.dli.demo.utils.DLIJobAgencyCredentialUtils.getICredentialProvider;
    
    import com.huaweicloud.sdk.core.auth.BasicCredentials;
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class GetUserCredentialsFlinkStream {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserCredentialsFlinkStream.class);
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<String> stream = streamEnv.addSource(new DataGen()).disableChaining();
            ICredentialProvider credentialProvider = getICredentialProvider();
    
            BasicCredentials basicCredentials = (BasicCredentials) credentialProvider.getCredentials();
            String ak = basicCredentials.getAk();
            String sk = basicCredentials.getSk();
            String securityToken = basicCredentials.getSecurityToken();
            LOG.info(">>" + " ak " + ak + " sk " + sk.length() + " token " + securityToken.length());
    
            stream.print();
            streamEnv.execute("GetUserCredentialsFlinkStream");
        }
    }
  • Tool class for obtaining job agencies
    package com.huawei.dli.demo.utils;
    
    import com.huaweicloud.sdk.core.auth.ICredentialProvider;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.ServiceLoader;
    
    public class DLIJobAgencyCredentialUtils {
    
        public static ICredentialProvider getICredentialProvider() {
            List<ICredentialProvider> credentialProviders = new ArrayList<>();
            ServiceLoader.load(ICredentialProvider.class, StreamExecutionEnvironment.class.getClassLoader())
                .iterator()
                .forEachRemaining(credentialProviders::add);
    
            if (credentialProviders.size() != 1) {
                throw new RuntimeException("Failed to obtain temporary user credential");
            }
            return credentialProviders.get(0);
        }
    }