Java Example Code
Development Description
This example applies only to MRS HBase.
- Prerequisites
A datasource connection has been created and bound to a queue on the DLI management console. For details, see Enhanced Datasource Connections.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
- Code implementation
- Import dependencies.
- Maven dependency involved
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Import dependency packages.
1
import org.apache.spark.sql.SparkSession;
- Maven dependency involved
- Create a session.
1
parkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate();
- Import dependencies.
- Connecting to data sources through SQL APIs
- For clusters with Kerberos authentication disabled
- Create a table to connect to an MRS HBase data source and set connection parameters.
1
sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city') ");
- Insert data.
1
sparkSession.sql("insert into testhbase values('12345','abc','xxx')");
- Query data.
1
sparkSession.sql("select * from testhbase").show();
Response
- Create a table to connect to an MRS HBase data source and set connection parameters.
- For clusters with Kerberos authentication enabled
- Create a table to connect to an MRS HBase data source and set connection parameters.
1
sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city,'krb5conf'='./krb5.conf','keytab'='./user.keytab','principal'='krbtest') ");
If Kerberos authentication is enabled, you need to set three more parameters, as listed in Table 1.For details about how to obtain the krb5.conf and keytab files, see Completing Configurations for Enabling Kerberos Authentication.
- Insert data.
1
sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')");
- Query data.
1
sparkSession.sql("select * from testhbase").show();
- Create a table to connect to an MRS HBase data source and set connection parameters.
- For clusters with Kerberos authentication disabled
- Submitting a Spark job
- Generate a JAR package based on the code file and upload the package to DLI.
For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.
- (Optional) Add the krb5.conf and user.keytab files to the dependency files of the job when creating a Spark job in an MRS cluster with Kerberos authentication enabled. Skip this step if Kerberos authentication is not enabled for the cluster. Figure 1 shows how to add the files.
- In the Spark job editor, select the corresponding dependency module and execute the Spark job.
For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
- If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.hbase when you submit a job.
- If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
- For details about how to submit a job on the console, see the description of the table "Parameters for selecting dependency resources" in Creating a Spark Job.
- For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.
- Generate a JAR package based on the code file and upload the package to DLI.
Complete Example Code
- Connecting to data sources through SQL APIs
- Complete example code for the cluster with Kerberos authentication disabled
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import org.apache.spark.sql.SparkSession; public class java_mrs_hbase { public static void main(String[] args) { //create a SparkSession session SparkSession sparkSession = SparkSession.builder().appName("datasource-HBase-MRS").getOrCreate(); sparkSession.sql("CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS('ZKHost'='10.0.0.63:2181','TableName'='hbtest','RowKey'='id:5','Cols'='location:info.location,city:detail.city') "); //*****************************SQL model*********************************** sparkSession.sql("insert into testhbase values('95274','abc','Hongkong')"); sparkSession.sql("select * from testhbase").show(); sparkSession.close(); } }
- Complete example code for the cluster with Kerberos authentication enabled
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
import org.apache.spark.SparkContext; import org.apache.spark.SparkFiles; import org.apache.spark.sql.SparkSession; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class Test_HBase_SparkSql_Kerberos { private static void copyFile(File src,File dst) throws IOException { InputStream input = null; OutputStream output = null; try { input = new FileInputStream(src); output = new FileOutputStream(dst); byte[] buf = new byte[1024]; int bytesRead; while ((bytesRead = input.read(buf)) > 0) { output.write(buf, 0, bytesRead); } } finally { input.close(); output.close(); } } public static void main(String[] args) throws InterruptedException, IOException { SparkSession sparkSession = SparkSession.builder().appName("Test_HBase_SparkSql_Kerberos").getOrCreate(); SparkContext sc = sparkSession.sparkContext(); sc.addFile("obs://xietest1/lzq/krb5.conf"); sc.addFile("obs://xietest1/lzq/user.keytab"); Thread.sleep(20); File krb5_startfile = new File(SparkFiles.get("krb5.conf")); File keytab_startfile = new File(SparkFiles.get("user.keytab")); String path_user = System.getProperty("user.dir"); File keytab_endfile = new File(path_user + "/" + keytab_startfile.getName()); File krb5_endfile = new File(path_user + "/" + krb5_startfile.getName()); copyFile(krb5_startfile,krb5_endfile); copyFile(keytab_startfile,keytab_endfile); Thread.sleep(20); /** * Create an association table for the DLI association Hbase table */ sparkSession.sql("CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') "); //*****************************SQL model*********************************** sparkSession.sql("insert into testhbase values('newtest',true,1,2,3,4,5)"); sparkSession.sql("select * from testhbase").show(); sparkSession.close(); } }
- Complete example code for the cluster with Kerberos authentication disabled
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot