El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page

Example Code

Updated on 2022-07-11 GMT+08:00

Function

The functions of the sample project are as follows:

  • Collect the name information from HDFS source files, query and combine data of HBase and Hive using the MultiComponentMapper class inherited from the Mapper abstract class.
  • Obtain the last piece of mapped data and output to HBase and HDFS, using the MultiComponentMapper class inherited from the Reducer abstract class.
  • The main function creates a MapReduce job and submits the MapReduce job to Hadoop clusters.

Example Code

For details about code, see the class com.huawei.bigdata.mapreduce.examples.MultiComponentExampl.

Example code of the map function used by MultiComponentMapper class to define the Mapper abstract class.

  private static class MultiComponentMapper extends Mapper<Object, Text, Text, Text> {

    Configuration conf;

    @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      conf = context.getConfiguration();

      // for components that depend on Zookeeper, need provide the conf of jaas and krb5
      // Notice, no need to login again here, will use the credentials in main function
      String krb5 = "krb5.conf";
      String jaas = "jaas_mr.conf";
      // These files are uploaded at main function
      File jaasFile = new File(jaas);
      File krb5File = new File(krb5);
      System.setProperty("java.security.auth.login.config", jaasFile.getCanonicalPath());
      System.setProperty("java.security.krb5.conf", krb5File.getCanonicalPath());
      System.setProperty("zookeeper.sasl.client", "true");

      LOG.info("UGI :" + UserGroupInformation.getCurrentUser());

      String name = "";
      String line = value.toString();
      if (line.contains("male")) {
        // A character string that has been read
        name = line.substring(0, line.indexOf(","));
      }
      // 1. read from HBase
      String hbaseData = readHBase();

      // 2. read from Hive
      String hiveData = readHive(name);

      // The Map task outputs a key-value pair.
      context.write(new Text(name), new Text("hbase:" + hbaseData + ", hive:" + hiveData));
    }

Example code of the readHBase function.

    private String readHBase() {
      String tableName = "table1";
      String columnFamily = "cf";
      String hbaseKey = "1";
      String hbaseValue;

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {
        // Create a HBase connection
        conn = ConnectionFactory.createConnection(hbaseConfig);
        // get table
        Table table = conn.getTable(TableName.valueOf(tableName));
        // Instantiate a Get object.
        Get get = new Get(hbaseKey.getBytes());
        // Submit a get request.
        Result result = table.get(get);
        hbaseValue = Bytes.toString(result.getValue(columnFamily.getBytes(), "cid".getBytes()));

        return hbaseValue;

      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
          if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e1) {
                        LOG.error("Failed to close the connection ", e1);
                    }
                  }
       
      }

      return "";
    }

Example of the readHive function.

    private String readHive(String name) throws IOException  {
      //Load the configuration file
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + "/";
      InputStream fileInputStream = null;
      try {
        clientInfo = new Properties();
        String hiveclientProp = userdir + "hiveclient.properties";
        File propertiesFile = new File(hiveclientProp);
        fileInputStream = new FileInputStream(propertiesFile);
        clientInfo.load(fileInputStream);
      } catch (Exception e) {
        throw new IOException(e);
      } finally {
        if (fileInputStream != null) {
          fileInputStream.close();
        }
      }
      String zkQuorum = clientInfo.getProperty("zk.quorum");
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      // Read this carefully:
      // MapReduce can only use Hive through JDBC.
      // Hive will submit another MapReduce Job to execute query.
      // So we run Hive in MapReduce is not recommended.
      final String driver = "org.apache.hive.jdbc.HiveDriver";

      String sql = "select name,sum(stayTime) as " + "stayTime from person where name = '" + name + "' group by name";

      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      // in map or reduce, use 'auth=delegationToken'
      sBuilder
         .append(";serviceDiscoveryMode=")

          .append(serviceDiscoveryMode)
          .append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";auth=delegationToken;");
      String url = sBuilder.toString();
      Connection connection = null;
      PreparedStatement statement = null;
      ResultSet resultSet = null;
      try {
        Class.forName(driver);
        connection = DriverManager.getConnection(url, "", "");
        statement = connection.prepareStatement(sql);
        resultSet = statement.executeQuery();

        if (resultSet.next()) {
          return resultSet.getString(1);
        }
      } catch (ClassNotFoundException e) {
        LOG.warn("Exception occur ", e);
      } catch (SQLException e) {
        LOG.warn("Exception occur ", e);
      } finally {
          if (null != resultSet) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
                if (null != statement) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
                if (null != connection) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        // handle exception
                    }
                }
        
      }

      return "";
    }

Example code of the reduce function used by MultiComponentReducer class to define the Reducer abstract class.

  private static class MultiComponentReducer extends Reducer<Text, Text, Text, Text> {
    Configuration conf;

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      conf = context.getConfiguration();

      // for components that depend on Zookeeper, need provide the conf of jaas and krb5
      // Notice, no need to login again here, will use the credentials in main function
      String krb5 = "krb5.conf";
      String jaas = "jaas_mr.conf";
      // These files are uploaded at main function
      File jaasFile = new File(jaas);
      File krb5File = new File(krb5);
      System.setProperty("java.security.auth.login.config", jaasFile.getCanonicalPath());
      System.setProperty("java.security.krb5.conf", krb5File.getCanonicalPath());
      System.setProperty("zookeeper.sasl.client", "true");

      Text finalValue = new Text("");
      // just pick the last value as the data to save
      for (Text value : values) {
        finalValue = value;
      }

      // write data to HBase
      writeHBase(key.toString(), finalValue.toString());

      // save result to HDFS
      context.write(key, finalValue);
    }

Example of the writeHBase function.

    private void writeHBase(String rowKey, String data) {
      String tableName = "table1";
      String columnFamily = "cf";

      try {
        LOG.info("UGI read :" + UserGroupInformation.getCurrentUser());
      } catch (IOException e1) {
        // handler exception
      }

      Configuration hbaseConfig = HBaseConfiguration.create(conf);
      org.apache.hadoop.hbase.client.Connection conn = null;
      try {
        // Create a HBase connection
        conn = ConnectionFactory.createConnection(hbaseConfig);
        // get table
        Table table = conn.getTable(TableName.valueOf(tableName));

        // create a Put to HBase
        List<Put> list = new ArrayList<Put>();
        byte[] row = Bytes.toBytes("row" + rowKey);
        Put put = new Put(row);
        byte[] family = Bytes.toBytes(columnFamily);
        byte[] qualifier = Bytes.toBytes("value");
        byte[] value = Bytes.toBytes(data);
        put.addColumn(family, qualifier, value);
        list.add(put);
        // execute Put
        table.put(list);
      } catch (IOException e) {
        LOG.warn("Exception occur ", e);
      } finally {
           if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e1) {
                        LOG.error("Failed to close the connection ", e1);
                    }
                }
      }

    }

Example code: the main() function creates a job, configures the dependency and permission, and submits the job to Hadoop clusters.

    public static void main(String[] args) throws Exception {
      //Load the hiveclient.properties configuration file
        Properties clientInfo = null;
        try {
            clientInfo = new Properties();
            clientInfo.load(MultiComponentExample.class.getClassLoader().getResourceAsStream("hiveclient.properties"));
        } catch (Exception e) {
            throw new IOException(e);
        } finally {

        }
      String zkQuorum = clientInfo.getProperty("zk.quorum");
      String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
      String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
      String principal = clientInfo.getProperty("principal");
      String auth = clientInfo.getProperty("auth");
      String sasl_qop = clientInfo.getProperty("sasl.qop");
      String hbaseKeytab = MultiComponentExample.class.getClassLoader().getResource("user.keytab").getPath();
      String hbaseJaas = MultiComponentExample.class.getClassLoader().getResource("jaas_mr.conf").getPath();
      String hiveClientProperties = MultiComponentExample.class.getClassLoader().getResource("hiveclient.properties").getPath();
      // a list of files, separated by comma
      String files = "file://" + KEYTAB + "," + "file://" + KRB + "," + "file://" + JAAS;
      files = files + "," + "file://" + hbaseKeytab;
      files = files + "," + "file://" + hbaseJaas;
      files = files + "," + "file://" + hiveClientProperties;
      // this setting will ask Job upload these files to HDFS
      config.set("tmpfiles", files);

      // clean control files before job submit
      MultiComponentExample.cleanupBeforeRun();

      // Security login
      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, PRINCIPAL, hbaseKeytab);
      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);
      LoginUtil.login(PRINCIPAL, KEYTAB, KRB, config);

      // find dependency jars for hive
      Class hiveDriverClass = Class.forName("org.apache.hive.jdbc.HiveDriver");
      Class thriftClass = Class.forName("org.apache.thrift.TException");
      Class serviceThriftCLIClass = Class.forName("org.apache.hive.service.rpc.thrift.TCLIService");
      Class hiveConfClass = Class.forName("org.apache.hadoop.hive.conf.HiveConf");
      Class hiveTransClass = Class.forName("org.apache.thrift.transport.HiveTSaslServerTransport");
      Class hiveMetaClass = Class.forName("org.apache.hadoop.hive.metastore.api.MetaException");
      Class hiveShimClass = Class.forName("org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23");
      Class thriftCLIClass = Class.forName("org.apache.hive.service.cli.thrift.ThriftCLIService");

      // add dependency jars to Job
      JarFinderUtil.addDependencyJars(config, hiveDriverClass, serviceThriftCLIClass, thriftCLIClass, thriftClass,
                hiveConfClass, hiveTransClass, hiveMetaClass, hiveShimClass);

      // add hive config file
      config.addResource("hive-site.xml");
      // add hbase config file
      Configuration conf = HBaseConfiguration.create(config);

      // Initialize the job object.
      Job job = Job.getInstance(conf);
      job.setJarByClass(MultiComponentExample.class);

      // set mapper&reducer class
      job.setMapperClass(MultiComponentMapper.class);
      job.setReducerClass(MultiComponentReducer.class);

      //set job input&output
      FileInputFormat.addInputPath(job, new Path(baseDir, INPUT_DIR_NAME + File.separator + "data.txt"));
      FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));

      // Set the output type of the job.
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      // HBase use this utility class to add dependecy jars of hbase to MR job
      TableMapReduceUtil.addDependencyJars(job);

      // this is mandatory when access to security HBase cluster
      // HBase add security credentials to Job, and will use in map and reduce
      TableMapReduceUtil.initCredentials(job);

      // create Hive security credentials

      StringBuilder sBuilder = new StringBuilder("jdbc:hive2://").append(zkQuorum).append("/");
      sBuilder.append(";serviceDiscoveryMode=").append(serviceDiscoveryMode).append(";zooKeeperNamespace=")
          .append(zooKeeperNamespace)
          .append(";sasl.qop=")
          .append(sasl_qop)
          .append(";auth=")
          .append(auth)
          .append(";principal=")
          .append(principal)
          .append(";");
      String url = sBuilder.toString();
      Connection connection = DriverManager.getConnection(url, "", "");
      String tokenStr = ((HiveConnection) connection)
          .getDelegationToken(UserGroupInformation.getCurrentUser().getShortUserName(), PRINCIPAL);
      connection.close();
      Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>();
      hive2Token.decodeFromUrlString(tokenStr);
      // add Hive security credentials to Job
      job.getCredentials().addToken(new Text("hive.server2.delegation.token"), hive2Token);
      job.getCredentials().addToken(new Text(HiveAuthConstants.HS2_CLIENT_TOKEN), hive2Token);

      // Submit the job to a remote environment for execution.
      System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
NOTE:

Replace all the zkQuorum objects with the actual information about the ZooKeeper cluster nodes.

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback