更新时间:2024-10-26 GMT+08:00
分享

通过Spark连接实例

本小节主要介绍使用Scala语言连接GeminiDB Cassandra的基本操作。

前提条件

  • 已成功创建GeminiDB Cassandra实例,且实例状态正常。创建GeminiDB Cassandra实例的方法请参见购买实例
  • 已创建弹性云服务器,创建弹性云服务器的方法,请参见《弹性云服务器快速入门》中“创建弹性云服务器”章节。
  • 弹性云服务器上已经安装Spark环境。

操作步骤

  1. 获取GeminiDB Cassandra实例的内网IP地址、端口。

    内网IP地址和端口的获取方法请参见查看IP地址和端口

  2. 登录弹性云服务器,具体操作请参见《弹性云服务器快速入门》中“登录弹性云服务器”。
  3. 编辑连接GeminiDB Cassandra实例的代码。

    • 如果是spark 2.x 连接GeminiDB Cassandra,建议使用的版本如下:

      spark:2.5.1

      scala:2.12

      spark-cassandra-connector:2.5.1

      使用如下样例代码连接数据库即可:
          /**
           * 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放、使用时解密),确保安全;
           * 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)USERNAME_ENV和PASSWORD_ENV。
           */
          val username: String = System.getenv().asScala.mkString("USERNAME_ENV")
          val password: String = System.getenv().asScala.mkString("PASSWORD_ENV")
          val sparkSession = SparkSession
            .builder()
            .appName("Spark Cassandra basic example")
            .master("local")
            .config("spark.cassandra.connection.host", "26.84.42.111")
            .config("spark.cassandra.connection.port", "9042")
            .config("spark.cassandra.auth.username", username)
            .config("spark.cassandra.auth.password", password)
            .getOrCreate()

      如果连接过程中有报错信息,请参考使用Spark连接Cassandra失败

    • 如果是spark 3.x 连接GeminiDB Cassandra,建议使用版本如下:

      spark:3.2.4

      scala:2.12.15

      java:1.8

      spark-cassandra-connector:3.1.0

      1. 建议首先重写一个新的CassandraConnectionFactory(修改loadBalancingPolicy为 DefaultLoadBalancingPolicy),代码如下:
        package sample
        import java.io.IOException
        import java.net.{MalformedURLException, URL}
        import java.nio.file.{Files, Paths}
        import java.time.Duration
        
        import com.datastax.bdp.spark.ContinuousPagingScanner
        import com.datastax.dse.driver.api.core.DseProtocolVersion
        import com.datastax.dse.driver.api.core.config.DseDriverOption
        import com.datastax.oss.driver.api.core.CqlSession
        import com.datastax.oss.driver.api.core.config.DefaultDriverOption._
        import com.datastax.oss.driver.api.core.config.{DriverConfigLoader, ProgrammaticDriverConfigLoaderBuilder => PDCLB}
        import com.datastax.oss.driver.internal.core.connection.ExponentialReconnectionPolicy
        import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy
        import com.datastax.oss.driver.internal.core.ssl.DefaultSslEngineFactory
        import com.datastax.spark.connector.rdd.ReadConf
        import com.datastax.spark.connector.util.{ConfigParameter, DeprecatedConfigParameter, ReflectionUtil}
        import org.apache.spark.{SparkConf, SparkEnv, SparkFiles}
        import org.slf4j.LoggerFactory
        
        import scala.jdk.CollectionConverters._
        import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnector, CassandraConnectorConf, CloudBasedContactInfo, DefaultScanner, IpBasedContactInfo, LocalNodeFirstLoadBalancingPolicy, MultipleRetryPolicy, MultiplexingSchemaListener, ProfileFileBasedContactInfo, Scanner}
        
        class ConnectionFactory extends CassandraConnectionFactory {
          @transient
          lazy private val logger = LoggerFactory.getLogger("com.datastax.spark.connector.cql.CassandraConnectionFactory")
        
          def connectorConfigBuilder(conf: CassandraConnectorConf, initBuilder: PDCLB) = {
        
            def basicProperties(builder: PDCLB): PDCLB = {
              val localCoreThreadCount = Math.max(1, Runtime.getRuntime.availableProcessors() - 1)
              builder
                .withInt(CONNECTION_POOL_LOCAL_SIZE, conf.localConnectionsPerExecutor.getOrElse(localCoreThreadCount)) // moved from CassandraConnector
                .withInt(CONNECTION_POOL_REMOTE_SIZE, conf.remoteConnectionsPerExecutor.getOrElse(1)) // moved from CassandraConnector
                .withInt(CONNECTION_INIT_QUERY_TIMEOUT, conf.connectTimeoutMillis)
                .withDuration(CONTROL_CONNECTION_TIMEOUT, Duration.ofMillis(conf.connectTimeoutMillis))
                .withDuration(METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(conf.connectTimeoutMillis))
                .withInt(REQUEST_TIMEOUT, conf.readTimeoutMillis)
                .withClass(RETRY_POLICY_CLASS, classOf[MultipleRetryPolicy])
                .withClass(RECONNECTION_POLICY_CLASS, classOf[ExponentialReconnectionPolicy])
                .withDuration(RECONNECTION_BASE_DELAY, Duration.ofMillis(conf.minReconnectionDelayMillis))
                .withDuration(RECONNECTION_MAX_DELAY, Duration.ofMillis(conf.maxReconnectionDelayMillis))
                .withInt(NETTY_ADMIN_SHUTDOWN_QUIET_PERIOD, conf.quietPeriodBeforeCloseMillis / 1000)
                .withInt(NETTY_ADMIN_SHUTDOWN_TIMEOUT, conf.timeoutBeforeCloseMillis / 1000)
                .withInt(NETTY_IO_SHUTDOWN_QUIET_PERIOD, conf.quietPeriodBeforeCloseMillis / 1000)
                .withInt(NETTY_IO_SHUTDOWN_TIMEOUT, conf.timeoutBeforeCloseMillis / 1000)
                .withBoolean(NETTY_DAEMON, true)
                .withBoolean(RESOLVE_CONTACT_POINTS, conf.resolveContactPoints)
                .withInt(MultipleRetryPolicy.MaxRetryCount, conf.queryRetryCount)
                .withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofMillis(conf.readTimeoutMillis))
                .withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofMillis(conf.readTimeoutMillis))
            }
        
            // compression option cannot be set to NONE (default)
            def compressionProperties(b: PDCLB): PDCLB =
              Option(conf.compression)
                .filter(_.toLowerCase != "none")
                .fold(b)(c => b.withString(PROTOCOL_COMPRESSION, c.toLowerCase))
        
            def localDCProperty(b: PDCLB): PDCLB =
              conf.localDC.map(b.withString(LOAD_BALANCING_LOCAL_DATACENTER, _)).getOrElse(b)
        
            // add ssl properties if ssl is enabled
            def ipBasedConnectionProperties(ipConf: IpBasedContactInfo) = (builder: PDCLB) => {
              builder
                .withStringList(CONTACT_POINTS, ipConf.hosts.map(h => s"${h.getHostString}:${h.getPort}").toList.asJava)
                .withClass(LOAD_BALANCING_POLICY_CLASS, classOf[DefaultLoadBalancingPolicy])
        
              def clientAuthEnabled(value: Option[String]) =
                if (ipConf.cassandraSSLConf.clientAuthEnabled) value else None
        
              if (ipConf.cassandraSSLConf.enabled) {
                Seq(
                  SSL_TRUSTSTORE_PATH -> ipConf.cassandraSSLConf.trustStorePath,
                  SSL_TRUSTSTORE_PASSWORD -> ipConf.cassandraSSLConf.trustStorePassword,
                  SSL_KEYSTORE_PATH -> clientAuthEnabled(ipConf.cassandraSSLConf.keyStorePath),
                  SSL_KEYSTORE_PASSWORD -> clientAuthEnabled(ipConf.cassandraSSLConf.keyStorePassword))
                  .foldLeft(builder) { case (b, (name, value)) =>
                    value.map(b.withString(name, _)).getOrElse(b)
                  }
                  .withClass(SSL_ENGINE_FACTORY_CLASS, classOf[DefaultSslEngineFactory])
                  .withStringList(SSL_CIPHER_SUITES, ipConf.cassandraSSLConf.enabledAlgorithms.toList.asJava)
                  .withBoolean(SSL_HOSTNAME_VALIDATION, false) // TODO: this needs to be configurable by users. Set to false for our integration tests
              } else {
                builder
              }
            }
        
            val universalProperties: Seq[PDCLB => PDCLB] =
              Seq( basicProperties, compressionProperties, localDCProperty)
        
            val appliedProperties: Seq[PDCLB => PDCLB] = conf.contactInfo match {
              case ipConf: IpBasedContactInfo => universalProperties :+ ipBasedConnectionProperties(ipConf)
              case other => universalProperties
            }
        
            appliedProperties.foldLeft(initBuilder){ case (builder, properties) => properties(builder)}
          }
        
          /** Creates and configures native Cassandra connection */
          override def createSession(conf: CassandraConnectorConf): CqlSession = {
            val configLoaderBuilder = DriverConfigLoader.programmaticBuilder()
            val configLoader = connectorConfigBuilder(conf, configLoaderBuilder).build()
        
            val initialBuilder = CqlSession.builder()
        
            val builderWithContactInfo =  conf.contactInfo match {
              case ipConf: IpBasedContactInfo =>
                ipConf.authConf.authProvider.fold(initialBuilder)(initialBuilder.withAuthProvider)
                  .withConfigLoader(configLoader)
              case CloudBasedContactInfo(path, authConf) =>
                authConf.authProvider.fold(initialBuilder)(initialBuilder.withAuthProvider)
                  .withCloudSecureConnectBundle(maybeGetLocalFile(path))
                  .withConfigLoader(configLoader)
              case ProfileFileBasedContactInfo(path) =>
                //Ignore all programmatic config for now ... //todo maybe allow programmatic config here by changing the profile?
                logger.warn(s"Ignoring all programmatic configuration, only using configuration from $path")
                initialBuilder.withConfigLoader(DriverConfigLoader.fromUrl(maybeGetLocalFile(path)))
            }
        
            val appName = Option(SparkEnv.get).map(env => env.conf.getAppId).getOrElse("NoAppID")
            builderWithContactInfo
              .withApplicationName(s"Spark-Cassandra-Connector-$appName")
              .withSchemaChangeListener(new MultiplexingSchemaListener())
              .build()
          }
        
          /**
           * Checks the Spark Temp work directory for the file in question, returning
           * it if exists, returning a generic URL from the string if not
           */
          def maybeGetLocalFile(path: String): URL = {
            val localPath = Paths.get(SparkFiles.get(path))
            if (Files.exists(localPath)) {
              logger.info(s"Found the $path locally at $localPath, using this local file.")
              localPath.toUri.toURL
            } else {
              try {
                new URL(path)
              } catch {
                case e: MalformedURLException =>
                  throw new IOException(s"The provided path $path is not a valid URL nor an existing locally path. Provide an " +
                    s"URL accessible to all executors or a path existing on all executors (you may use `spark.files` to " +
                    s"distribute a file to each executor).", e)
              }
            }
          }
        
          def continuousPagingEnabled(session: CqlSession): Boolean = {
            val confEnabled = SparkEnv.get.conf.getBoolean(CassandraConnectionFactory.continuousPagingParam.name, CassandraConnectionFactory.continuousPagingParam.default)
            val pv = session.getContext.getProtocolVersion
            if (pv.getCode > DseProtocolVersion.DSE_V1.getCode && confEnabled) {
              logger.debug(s"Scan Method Being Set to Continuous Paging")
              true
            } else {
              logger.debug(s"Scan Mode Disabled or Connecting to Non-DSE Cassandra Cluster")
              false
            }
          }
        
          override def getScanner(
                                   readConf: ReadConf,
                                   connConf: CassandraConnectorConf,
                                   columnNames: scala.IndexedSeq[String]): Scanner = {
        
            val isContinuousPagingEnabled =
              new CassandraConnector(connConf).withSessionDo { continuousPagingEnabled }
        
            if (isContinuousPagingEnabled) {
              logger.debug("Using ContinousPagingScanner")
              ContinuousPagingScanner(readConf, connConf, columnNames)
            } else {
              logger.debug("Not Connected to DSE 5.1 or Greater Falling back to Non-Continuous Paging")
              new DefaultScanner(readConf, connConf, columnNames)
            }
          }
        }
      2. 连接GeminiDB Cassandra的代码如下:
        /**
         * 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放、使用时解密),确保安全;
         * 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)USERNAME_ENV和PASSWORD_ENV。
         */
        val username: String = System.getenv().asScala.mkString("USERNAME_ENV")
        val password: String = System.getenv().asScala.mkString("PASSWORD_ENV")
        val sparkSession = SparkSession
          .builder()
          .appName("Spark Cassandra basic example")
          .master("local")
          .config("spark.cassandra.connection.host", host)
          .config("spark.cassandra.connection.port", port)
          .config("spark.cassandra.auth.username", username)
          .config("spark.cassandra.auth.password", password)
          .config("spark.cassandra.connection.factory", "sample.ConnectionFactory")  //指定为自己定义的ConnectionFactory
          .getOrCreate()

  4. 运行示例代码,确认结果是否正常。

相关文档