通过Spark连接实例
本小节主要介绍使用Scala语言连接GeminiDB Cassandra的基本操作。
前提条件
操作步骤
- 获取GeminiDB Cassandra实例的内网IP地址、端口。
内网IP地址和端口的获取方法请参见查看IP地址和端口。
- 登录弹性云服务器,具体操作请参见《弹性云服务器快速入门》中“登录弹性云服务器”。
- 编辑连接GeminiDB Cassandra实例的代码。
- 如果是spark 2.x 连接GeminiDB Cassandra,建议使用的版本如下:
spark:2.5.1
scala:2.12
spark-cassandra-connector:2.5.1
使用如下样例代码连接数据库即可:/** * 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放、使用时解密),确保安全; * 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)USERNAME_ENV和PASSWORD_ENV。 */ val username: String = System.getenv().asScala.mkString("USERNAME_ENV") val password: String = System.getenv().asScala.mkString("PASSWORD_ENV") val sparkSession = SparkSession .builder() .appName("Spark Cassandra basic example") .master("local") .config("spark.cassandra.connection.host", "26.84.42.111") .config("spark.cassandra.connection.port", "9042") .config("spark.cassandra.auth.username", username) .config("spark.cassandra.auth.password", password) .getOrCreate()
如果连接过程中有报错信息,请参考使用Spark连接Cassandra失败。
- 如果是spark 3.x 连接GeminiDB Cassandra,建议使用版本如下:
spark:3.2.4
scala:2.12.15
java:1.8
spark-cassandra-connector:3.1.0
- 建议首先重写一个新的CassandraConnectionFactory(修改loadBalancingPolicy为 DefaultLoadBalancingPolicy),代码如下:
package sample import java.io.IOException import java.net.{MalformedURLException, URL} import java.nio.file.{Files, Paths} import java.time.Duration import com.datastax.bdp.spark.ContinuousPagingScanner import com.datastax.dse.driver.api.core.DseProtocolVersion import com.datastax.dse.driver.api.core.config.DseDriverOption import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.config.DefaultDriverOption._ import com.datastax.oss.driver.api.core.config.{DriverConfigLoader, ProgrammaticDriverConfigLoaderBuilder => PDCLB} import com.datastax.oss.driver.internal.core.connection.ExponentialReconnectionPolicy import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy import com.datastax.oss.driver.internal.core.ssl.DefaultSslEngineFactory import com.datastax.spark.connector.rdd.ReadConf import com.datastax.spark.connector.util.{ConfigParameter, DeprecatedConfigParameter, ReflectionUtil} import org.apache.spark.{SparkConf, SparkEnv, SparkFiles} import org.slf4j.LoggerFactory import scala.jdk.CollectionConverters._ import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnector, CassandraConnectorConf, CloudBasedContactInfo, DefaultScanner, IpBasedContactInfo, LocalNodeFirstLoadBalancingPolicy, MultipleRetryPolicy, MultiplexingSchemaListener, ProfileFileBasedContactInfo, Scanner} class ConnectionFactory extends CassandraConnectionFactory { @transient lazy private val logger = LoggerFactory.getLogger("com.datastax.spark.connector.cql.CassandraConnectionFactory") def connectorConfigBuilder(conf: CassandraConnectorConf, initBuilder: PDCLB) = { def basicProperties(builder: PDCLB): PDCLB = { val localCoreThreadCount = Math.max(1, Runtime.getRuntime.availableProcessors() - 1) builder .withInt(CONNECTION_POOL_LOCAL_SIZE, conf.localConnectionsPerExecutor.getOrElse(localCoreThreadCount)) // moved from CassandraConnector .withInt(CONNECTION_POOL_REMOTE_SIZE, conf.remoteConnectionsPerExecutor.getOrElse(1)) // moved from CassandraConnector .withInt(CONNECTION_INIT_QUERY_TIMEOUT, conf.connectTimeoutMillis) .withDuration(CONTROL_CONNECTION_TIMEOUT, Duration.ofMillis(conf.connectTimeoutMillis)) .withDuration(METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(conf.connectTimeoutMillis)) .withInt(REQUEST_TIMEOUT, conf.readTimeoutMillis) .withClass(RETRY_POLICY_CLASS, classOf[MultipleRetryPolicy]) .withClass(RECONNECTION_POLICY_CLASS, classOf[ExponentialReconnectionPolicy]) .withDuration(RECONNECTION_BASE_DELAY, Duration.ofMillis(conf.minReconnectionDelayMillis)) .withDuration(RECONNECTION_MAX_DELAY, Duration.ofMillis(conf.maxReconnectionDelayMillis)) .withInt(NETTY_ADMIN_SHUTDOWN_QUIET_PERIOD, conf.quietPeriodBeforeCloseMillis / 1000) .withInt(NETTY_ADMIN_SHUTDOWN_TIMEOUT, conf.timeoutBeforeCloseMillis / 1000) .withInt(NETTY_IO_SHUTDOWN_QUIET_PERIOD, conf.quietPeriodBeforeCloseMillis / 1000) .withInt(NETTY_IO_SHUTDOWN_TIMEOUT, conf.timeoutBeforeCloseMillis / 1000) .withBoolean(NETTY_DAEMON, true) .withBoolean(RESOLVE_CONTACT_POINTS, conf.resolveContactPoints) .withInt(MultipleRetryPolicy.MaxRetryCount, conf.queryRetryCount) .withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofMillis(conf.readTimeoutMillis)) .withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofMillis(conf.readTimeoutMillis)) } // compression option cannot be set to NONE (default) def compressionProperties(b: PDCLB): PDCLB = Option(conf.compression) .filter(_.toLowerCase != "none") .fold(b)(c => b.withString(PROTOCOL_COMPRESSION, c.toLowerCase)) def localDCProperty(b: PDCLB): PDCLB = conf.localDC.map(b.withString(LOAD_BALANCING_LOCAL_DATACENTER, _)).getOrElse(b) // add ssl properties if ssl is enabled def ipBasedConnectionProperties(ipConf: IpBasedContactInfo) = (builder: PDCLB) => { builder .withStringList(CONTACT_POINTS, ipConf.hosts.map(h => s"${h.getHostString}:${h.getPort}").toList.asJava) .withClass(LOAD_BALANCING_POLICY_CLASS, classOf[DefaultLoadBalancingPolicy]) def clientAuthEnabled(value: Option[String]) = if (ipConf.cassandraSSLConf.clientAuthEnabled) value else None if (ipConf.cassandraSSLConf.enabled) { Seq( SSL_TRUSTSTORE_PATH -> ipConf.cassandraSSLConf.trustStorePath, SSL_TRUSTSTORE_PASSWORD -> ipConf.cassandraSSLConf.trustStorePassword, SSL_KEYSTORE_PATH -> clientAuthEnabled(ipConf.cassandraSSLConf.keyStorePath), SSL_KEYSTORE_PASSWORD -> clientAuthEnabled(ipConf.cassandraSSLConf.keyStorePassword)) .foldLeft(builder) { case (b, (name, value)) => value.map(b.withString(name, _)).getOrElse(b) } .withClass(SSL_ENGINE_FACTORY_CLASS, classOf[DefaultSslEngineFactory]) .withStringList(SSL_CIPHER_SUITES, ipConf.cassandraSSLConf.enabledAlgorithms.toList.asJava) .withBoolean(SSL_HOSTNAME_VALIDATION, false) // TODO: this needs to be configurable by users. Set to false for our integration tests } else { builder } } val universalProperties: Seq[PDCLB => PDCLB] = Seq( basicProperties, compressionProperties, localDCProperty) val appliedProperties: Seq[PDCLB => PDCLB] = conf.contactInfo match { case ipConf: IpBasedContactInfo => universalProperties :+ ipBasedConnectionProperties(ipConf) case other => universalProperties } appliedProperties.foldLeft(initBuilder){ case (builder, properties) => properties(builder)} } /** Creates and configures native Cassandra connection */ override def createSession(conf: CassandraConnectorConf): CqlSession = { val configLoaderBuilder = DriverConfigLoader.programmaticBuilder() val configLoader = connectorConfigBuilder(conf, configLoaderBuilder).build() val initialBuilder = CqlSession.builder() val builderWithContactInfo = conf.contactInfo match { case ipConf: IpBasedContactInfo => ipConf.authConf.authProvider.fold(initialBuilder)(initialBuilder.withAuthProvider) .withConfigLoader(configLoader) case CloudBasedContactInfo(path, authConf) => authConf.authProvider.fold(initialBuilder)(initialBuilder.withAuthProvider) .withCloudSecureConnectBundle(maybeGetLocalFile(path)) .withConfigLoader(configLoader) case ProfileFileBasedContactInfo(path) => //Ignore all programmatic config for now ... //todo maybe allow programmatic config here by changing the profile? logger.warn(s"Ignoring all programmatic configuration, only using configuration from $path") initialBuilder.withConfigLoader(DriverConfigLoader.fromUrl(maybeGetLocalFile(path))) } val appName = Option(SparkEnv.get).map(env => env.conf.getAppId).getOrElse("NoAppID") builderWithContactInfo .withApplicationName(s"Spark-Cassandra-Connector-$appName") .withSchemaChangeListener(new MultiplexingSchemaListener()) .build() } /** * Checks the Spark Temp work directory for the file in question, returning * it if exists, returning a generic URL from the string if not */ def maybeGetLocalFile(path: String): URL = { val localPath = Paths.get(SparkFiles.get(path)) if (Files.exists(localPath)) { logger.info(s"Found the $path locally at $localPath, using this local file.") localPath.toUri.toURL } else { try { new URL(path) } catch { case e: MalformedURLException => throw new IOException(s"The provided path $path is not a valid URL nor an existing locally path. Provide an " + s"URL accessible to all executors or a path existing on all executors (you may use `spark.files` to " + s"distribute a file to each executor).", e) } } } def continuousPagingEnabled(session: CqlSession): Boolean = { val confEnabled = SparkEnv.get.conf.getBoolean(CassandraConnectionFactory.continuousPagingParam.name, CassandraConnectionFactory.continuousPagingParam.default) val pv = session.getContext.getProtocolVersion if (pv.getCode > DseProtocolVersion.DSE_V1.getCode && confEnabled) { logger.debug(s"Scan Method Being Set to Continuous Paging") true } else { logger.debug(s"Scan Mode Disabled or Connecting to Non-DSE Cassandra Cluster") false } } override def getScanner( readConf: ReadConf, connConf: CassandraConnectorConf, columnNames: scala.IndexedSeq[String]): Scanner = { val isContinuousPagingEnabled = new CassandraConnector(connConf).withSessionDo { continuousPagingEnabled } if (isContinuousPagingEnabled) { logger.debug("Using ContinousPagingScanner") ContinuousPagingScanner(readConf, connConf, columnNames) } else { logger.debug("Not Connected to DSE 5.1 or Greater Falling back to Non-Continuous Paging") new DefaultScanner(readConf, connConf, columnNames) } } }
- 连接GeminiDB Cassandra的代码如下:
/** * 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放、使用时解密),确保安全; * 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)USERNAME_ENV和PASSWORD_ENV。 */ val username: String = System.getenv().asScala.mkString("USERNAME_ENV") val password: String = System.getenv().asScala.mkString("PASSWORD_ENV") val sparkSession = SparkSession .builder() .appName("Spark Cassandra basic example") .master("local") .config("spark.cassandra.connection.host", host) .config("spark.cassandra.connection.port", port) .config("spark.cassandra.auth.username", username) .config("spark.cassandra.auth.password", password) .config("spark.cassandra.connection.factory", "sample.ConnectionFactory") //指定为自己定义的ConnectionFactory .getOrCreate()
- 建议首先重写一个新的CassandraConnectionFactory(修改loadBalancingPolicy为 DefaultLoadBalancingPolicy),代码如下:
- 如果是spark 2.x 连接GeminiDB Cassandra,建议使用的版本如下:
- 运行示例代码,确认结果是否正常。