文档首页 > > 开发指南> 使用Spark作业跨源访问数据源> 对接OpenTSDB>

scala样例代码

scala样例代码

分享
更新时间:2020/12/21 GMT+08:00

开发说明

支持对接CloudTable的OpenTSDB和MRS的OpenTSDB。

  • 前提条件

    在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

  • 构造依赖信息,创建SparkSession
    1. 导入依赖。
      涉及到mvn依赖
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      import相关依赖包
      1
      2
      3
      4
      import scala.collection.mutable
      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.rdd.RDD
      import org.apache.spark.sql.types._
      
    2. 创建会话。
      1
      val sparkSession = SparkSession.builder().getOrCreate()
      
    3. 创建DLI关联跨源访问 OpenTSDB的关联表。
      1
      2
      3
      4
      sparkSession.sql("create table opentsdb_test using opentsdb options(
      	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',	
              'metric'='ctopentsdb',
      	'tags'='city,location')")
      
      表1 创建表参数

      参数

      说明

      host

      OpenTSDB连接地址。

      • 访问CloudTable OpenTSDB,填写OpenTSDB链接地址,获取方式请参考“图 CloudTable OpenTSDB链接地址信息”。
      • 访问MRS OpenTSDB,若使用增强型跨源连接,填写OpenTSDB所在节点IP与端口,格式为"IP:PORT",OpenTSDB存在多个节点时,用分号隔开,获取方式请参考“图 MRS集群OpenTSDB IP信息”和“图 MRS集群OpenTSDB 端口信息”。若使用经典型跨源,填写经典型跨源返回的连接地址,管理控制台操作请参考《数据湖探索用户指南》。

      metric

      所创建的dli表对应的OpenTSDB中的指标名称。

      tags

      metric对应的标签,用于归类、过滤、快速检索等操作,可以是1到8个,以“,”分隔,包括对应metric下的所有tagk的值。

      图1 CloudTable OpenTSDB链接地址信息
      图2 MRS集群OpenTSDB IP信息
      图3 MRS集群OpenTSDB 端口信息
  • 通过SQL API访问
    1. 插入数据
      1
      sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)")
      
    2. 查询数据
      1
      sparkSession.sql("select * from opentsdb_test").show()
      

      返回结果:

  • 通过DataFrame API访问
    1. 构造schema
      1
      2
      3
      4
      5
      val attrTag1Location = new StructField("location", StringType)
      val attrTag2Name = new StructField("name", StringType)
      val attrTimestamp = new StructField("timestamp", LongType)
      val attrValue = new StructField("value", DoubleType)
      val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp, attrValue)
      
    2. 根据schema的类型构造数据
      1
      2
      val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0)
      val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
      
    3. 导入数据到OpenTSDB
      1
      sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")
      
    4. 读取OpenTSDB上的数据
      1
      2
      3
      4
      5
      val map = new mutable.HashMap[String, String]()
      map("metric") = "ctopentsdb"
      map("tags") = "city,location"
      map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
      sparkSession.read.format("opentsdb").options(map.toMap).load().show()
      

      返回结果:

  • 提交Spark作业
    1. 将写好的代码生成jar包,上传至DLI中。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。
    2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 提交作业时,需要指定Module模块,名称为:sys.datasource.opentsdb。
      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“表6-选择依赖资源参数说明”。
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

  • Maven依赖
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • 通过SQL API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import org.apache.spark.sql.SparkSession
    
    object Test_OpenTSDB_CT {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        // Create a data table for DLI association OpenTSDB
        sparkSession.sql("create table opentsdb_test using opentsdb options(
    	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
    	'metric'='ctopentsdb',
    	'tags'='city,location')")
    
        //*****************************SQL module***********************************
        sparkSession.sql("insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)")
        sparkSession.sql("select * from opentsdb_test").show()
    
        sparkSession.close()
      }
    }
    
  • 通过DataFrame API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import scala.collection.mutable
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.types._
    
    object Test_OpenTSDB_CT {
      def main(args: Array[String]): Unit = {
        // Create a SparkSession session.
        val sparkSession = SparkSession.builder().getOrCreate()
    
        // Create a data table for DLI association OpenTSDB
        sparkSession.sql("create table opentsdb_test using opentsdb options(
    	'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',
    	'metric'='ctopentsdb',
    	'tags'='city,location')")
    
        //*****************************DataFrame model***********************************
        // Setting schema
        val attrTag1Location = new StructField("location", StringType)
        val attrTag2Name = new StructField("name", StringType)
        val attrTimestamp = new StructField("timestamp", LongType)
        val attrValue = new StructField("value", DoubleType)
        val attrs = Array(attrTag1Location, attrTag2Name, attrTimestamp,attrValue)
    
        // Populate data according to the type of schema
        val mutableRow: Seq[Any] = Seq("beijing", "abc", 123456L, 30.0)
        val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
    
        //Import the constructed data into OpenTSDB
        sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("opentsdb_test")
    
        //Read data on OpenTSDB
        val map = new mutable.HashMap[String, String]()
        map("metric") = "ctopentsdb"
        map("tags") = "city,location"
        map("Host") = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242"
        sparkSession.read.format("opentsdb").options(map.toMap).load().show()
    
        sparkSession.close()
      }
    }
    
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!非常感谢您的反馈,我们会继续努力做到更好!
反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区论坛频道来与我们联系探讨

智能客服提问云社区提问