Updated on 2024-07-04 GMT+08:00

Java Example Code

Development Description

This example applies only to MRS OpenTSDB.

  • Prerequisites

    A datasource connection has been created and bound to a queue on the DLI management console. For details, see Enhanced Datasource Connections.

    Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.

  • Code implementation
    1. Import dependencies.
      • Maven dependency involved
        1
        2
        3
        4
        5
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.2</version>
        </dependency>
        
      • Import dependency packages.
        1
        import org.apache.spark.sql.SparkSession;
        
    2. Create a session.
      1
      sparkSession = SparkSession.builder().appName("datasource-opentsdb").getOrCreate();
      
  • Connecting to data sources through SQL APIs
    • Create a table to connect to an MRS OpenTSDB data source and set connection parameters.
      1
      sparkSession.sql("create table opentsdb_new_test using opentsdb options('Host'='10.0.0.171:4242','metric'='ctopentsdb','tags'='city,location')");
      

      For details about the Host, metric, and tags parameters, see Table 1.

    • Insert data.
      1
      sparkSession.sql("insert into opentsdb_new_test values('Penglai', 'abc', '2021-06-30 18:00:00', 30.0)");
      
    • Query data.
      1
      sparkSession.sql("select * from opentsdb_new_test").show();
      

      Response

  • Submitting a Spark job
    1. Generate a JAR package based on the code file and upload the package to DLI.

      For details about console operations, see Creating a Package. For details about API operations, see Uploading a Package Group.

    2. In the Spark job editor, select the corresponding dependency module and execute the Spark job.

      For details about console operations, see Creating a Spark Job. For details about API operations, see Creating a Batch Processing Job.
      • If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the Module to sys.datasource.opentsdb when you submit a job.
      • If the Spark version is 3.1.1, you do not need to select a module. Configure Spark parameters (--conf).

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*

      • For details about how to submit a job on the console, see the description of the Table 3 "Parameters for selecting dependency resources" in Creating a Spark Job.
      • For details about how to submit a job through an API, see the description of the modules parameter in Table 2 "Request parameters" in Creating a Batch Processing Job.

Complete Example Code

  • Maven dependency involved
    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.2</version>
    </dependency>
  • Connecting to data sources through SQL APIs
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import org.apache.spark.sql.SparkSession;
     
    public class java_mrs_opentsdb {
     
        private static SparkSession sparkSession = null;
     
        public static void main(String[] args) {
            //create a SparkSession session
            sparkSession = SparkSession.builder().appName("datasource-opentsdb").getOrCreate();
     
            sparkSession.sql("create table opentsdb_new_test using opentsdb options('Host'='10.0.0.171:4242','metric'='ctopentsdb','tags'='city,location')");
     
            //*****************************SQL module***********************************
            sparkSession.sql("insert into opentsdb_new_test values('Penglai', 'abc', '2021-06-30 18:00:00', 30.0)");
            System.out.println("Penglai new timestamp");
            sparkSession.sql("select * from opentsdb_new_test").show();
     
           sparkSession.close();
     
        }
    }