更新时间:2024-07-04 GMT+08:00

scala样例代码

开发说明

mongo只支持增强型跨源。

DDS即文档数据库服务,兼容MongoDB协议。

在DLI管理控制台上已完成创建增强跨源连接,并绑定队列。具体操作请参考《数据湖探索用户指南》。

认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

  • 构造依赖信息,创建SparkSession
    1. 导入依赖。
      涉及到mvn依赖
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
      import相关依赖包
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
      创建session
      val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
  • 通过SQL API访问
    1. 创建DLI跨源访问 mongo的关联表
      sparkSession.sql(
        "create table test_dds(id string, name string, age int) using mongo options(
          'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
          'uri' = 'mongodb://username:pwd@host:8635/db',
          'database' = 'test',
          'collection' = 'test',
          'user' = 'rwuser',
          'password' = '######')")
      表1 创建表参数

      参数

      说明

      url

      • url的格式为:

        "IP:PORT[,IP:PORT]/[DATABASE][.COLLECTION][AUTH_PROPERTIES]"

        例如:

        "192.168.4.62:8635/test?authSource=admin"
      • url需要在mongo(DDS)的连接地址的截取得到。

        获取到的mongo的连接地址格式为:"协议头://用户名:密码@访问地址:访问端口/数据库名?authSource=admin"

        例如:

        mongodb://rwuser:****@192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin

      uri

      uri的格式为:mongodb://username:pwd@host:8635/db

      其中以下参数需要修改为实际值:

      • “username”为创建的mongo(DDS)数据库用户名。
      • “pwd”为创建的mongo(DDS)数据库用户名对应的密码。
      • “host”为创建的mongo(DDS)数据库实例IP。
      • “db”为创建的mongo(DDS)数据库名称。

      mongo(DDS)数据库用户创建详见:创建DDS数据库账户

      database

      DDS的数据库名,如果在"url"中同时指定了数据库名,则"url"中的数据库名不生效。

      collection

      DDS中的collection名,如果在"url"中同时指定了collection,则"url"中的collection不生效。

      说明:

      如果在DDS中已存在collection,则建表可以不指定schema信息,DLI会根据collection中的数据自动生成schema信息。

      user

      访问DDS集群用户名。

      password

      访问DDS集群密码。

      图1 mongo的链接地址信息
    2. 插入数据
      sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
    3. 查询数据
      sparkSession.sql("select * from test_dds").show()

      操作结果

  • 通过DataFrame API访问
    1. 设置连接参数
      val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      val uri = "mongodb://username:pwd@host:8635/db"
      val user = "rwuser"
      val database = "test"
      val collection = "test"
      val password = "######"
    2. 构造schema
      1
      val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
      
    3. 构造DataFrame
      val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
      val dataFrame = spark.createDataFrame(rdd, schema)
    4. 导入数据到mongo
      1
      2
      3
      4
      5
      6
      7
      8
      9
      dataFrame.write.format("mongo")
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .mode(SaveMode.Overwrite)
        .save()
      

      保存类型:Overwrite、Append、ErrorIfExis、Ignore 四种。

    5. 读取mongo上的数据
      1
      2
      3
      4
      5
      6
      7
      8
      val jdbcDF = spark.read.format("mongo").schema(schema)
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .load()
      

      操作结果

  • 提交Spark作业
    1. 将写好的代码生成jar包,上传至DLI中。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

    2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.mongo。
      • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*

      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

  • Maven依赖
    1
    2
    3
    4
    5
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    
  • 通过SQL API访问
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.spark.sql.SparkSession
    
    object TestMongoSql {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().getOrCreate()
        sparkSession.sql(
          "create table test_dds(id string, name string, age int) using mongo options(
            'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
            'uri' = 'mongodb://username:pwd@host:8635/db',
            'database' = 'test',
            'collection' = 'test',
            'user' = 'rwuser',
            'password' = '######')")
        sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
        sparkSession.sql("select * from test_dds").show()
        sparkSession.close()
      }
    }
    
  • 通过DataFrame API访问
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    object Test_Mongo_SparkSql {
      def main(args: Array[String]): Unit = {
      //  Create a SparkSession session.  
      val spark = SparkSession.builder().appName("mongodbTest").getOrCreate()
    
      // Set the connection configuration parameters.  
      val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
      val uri = "mongodb://username:pwd@host:8635/db"
      val user = "rwuser"
      val database = "test"
      val collection = "test"
      val password = "######"
    
      // Setting up the schema
      val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType)))
    
      // Setting up the DataFrame
      val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32)))
      val dataFrame = spark.createDataFrame(rdd, schema)
    
    
      // Write data to mongo
      dataFrame.write.format("mongo")
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .mode(SaveMode.Overwrite)
        .save()
    
      // Reading data from mongo
      val jdbcDF = spark.read.format("mongo").schema(schema)
        .option("url", url)
        .option("uri", uri)
        .option("database", database)
        .option("collection", collection)
        .option("user", user)
        .option("password", password)
        .load()
      jdbcDF.show()
    
      spark.close()
     }
    }