配置Spark Native引擎
配置场景
Spark Native引擎是通过使用向量化的C++加速库,实现对Spark算子性能加速的一种技术方案。传统的SparkSQL是基于行式数据,通过JVM的codegen来实现查询加速的,由于JVM对生成的java代码存在各种约束,比如方法长度,参数个数等,以及行式数据对内存带宽的利用率不足,因此存在性能提升空间。使用成熟的向量化的c++加速库后,数据采用向量化格式存在内存中,可以提高带宽利用率,并通过批量的数处理获得加速效果。
通过开启Spark Native引擎特性,获得SparkSQL的性能加速。配置Spark Native引擎后不支持Spark大SQL查询和删除。
使用约束
- 本章节仅适用于MRS 3.6.0.1及之后版本。
- 支持spark-sql以及spark-beeline任务。
- 支持MRS自研reader以及开源reader,由spark.gluten.sql.columnar.backend.ch.mrs.reader(默认值为true)来控制。
- 不支持Spark大SQL防御特性。
- 数据类型
名称
mrs reader是否支持
开源reader是否支持
BooleanType、ByteType、CharType、DateType、DoubleType、DecimalType、FloatType、IntegerType、LongType、StringType、ShortType、TimestampType、VarcharType
√
√
BinaryType、DayTimeIntervalType、TimestampNTZType、YearMonthIntervalType
x
√
ArrayType、MapType、StructType
x
√
- 数据格式
名称
mrs reader是否支持
开源reader是否支持
parquet、orc
√
√
csv、json、txt
x
√
- 算子
名称
是否支持
AdaptiveSparkPlanExec、AQEShuffleReadExec、BroadcastHashJoinExec、CoalesceExec、ExpandExec、FileSourceScan、FilterExec、GenerateExec、GlobalLimitExec、HashAggregateExec、HiveTableScanExec、InMemoryTableScanExec、LimitExec、LocalTableScanExec、ObjectHashAggregateExec、ProjectExec、RangeExec、ReusedSubqueryExec、ReusedExchangeExec、RowToColumnarExec、SortExec、SubqueryExec、SubqueryBroadcastExec、SortMergeJoinExec、ShuffleExchangeExec、ShuffledHashJoinExec、SubqueryAdaptiveBroadcastExec、TakeOrderedAndProjectExec、UnionExec、WholeStageCodeGenExec、WindowExec、BroadcastExchangeExec、BroadcastNestedLoopJoinExec
√
AppendColumnsExec、CollectLimitExec、CollectMetricsExec、CoGroupExec、DataWritingCommandExec、EvalPythonExec、ExternalRDDScanExec、MapElementsExec、ParallelInsertUnionExec、SampleExec、V2CommandExec
x
- 函数 仅支持表格中包含的函数:
类型
函数名称
逻辑运算
is_not_null,is_null,gte,gt,lte,lt,equal,and,or,not,xor,extract,cast,alias,nullif
时间
get_timestamp,quarter,to_unix_timestamp,unix_timestamp,date_format,from_unixtime,date_add,date_sub,datediff, second,add_months,trunc,date_trunc,floor_datetime
数学运算
subtract,multiply,add,divide,positive,negative,modulus,pmod,abs,ceil,floor,round,bround,exp,power,cos,cosh,sin,sinh,tan,tanh,acos,asin,atan,atan2,asinh,acosh,atanh,bitwise_not,bitwise_and,bitwise_or,bitwise_xor,sqrt,cbrt,degrees,e,pi,hex,unhex,hypot,sign,log10,log1p,log2,log,radians,greatest,least,shiftleft,shiftright,check_overflow,factorial,rand,isnan
字符串
like,not_like,starts_with,ends_with,contains,substring,lower,upper,trim,ltrim,rtrim,concat,strpos,char_length,replace,regexp_replace,regexp_extract,regexp_extract_all,chr,rlike,ascii,split,concat_ws,base64,unbase64,lpad,rpad,reverse,md5,translate,repeat,position,locate,space
哈希
sha1,sha2,crc32,murmur3hash,xxhash64
表数据生成
explode,posexplode
In相关
In
null相关
coalesce
array相关
array,size,get_array_item,element_at,range
map相关
map,get_map_value,map_keys,map_values,map_from_arrays
tuple相关
get_struct_field,named_struct
json相关
get_json_object,to_json,from_json,json_tuple,json_array_length,make_decimal,unscaled_value
- 文件系统
名称
是否支持
OBS、HDFS、Local FS、OBS与HDFS混合部署
√
- 平台
名称
是否支持
X86、ARM、X86与ARM混合部署
√
- 使用方式
名称
是否支持
spark-sql yarn模式
√
spark-sql local模式
√
- 时区格式
名称
是否支持
基于区域的IDs,例如 Asia/Beijing
√
基于秒级时间间隔的格式,如SET TIME ZONE INTERVAL interval_literal
x
- Native Shuffle(spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager),支持的压缩算法(spark.io.compression.codec)。
名称
是否支持
lz4
√
lzf
x
snappy
x
zstd
√
gzip
x
bzip2
x
- MemartsCC支持度:
场景
开源reader是否支持
MRS reader是否支持
安全集群MemartsCC
√
x
普通集群MemartsCC
√
√
服务端配置参数
- 在Manager系统中,选择“集群 > 服务 > Spark > 配置”,单击“全部配置”,选择“Spark(服务) > Native”,配置如下参数:
参数
说明
默认值
spark.plugins
Spark用到的插件,参数值设置为org.apache.gluten.GlutenPlugin。
说明:如果已经配置了spark.plugins,则可以将
org.apache.gluten.GlutenPlugin加到其中,用逗号","隔开。
空
spark.gluten.memory.unify.enabled
是否开启Spark Native统一内存管理。设置为true,Native加速需要使用统一内存管理。
true
spark.executorEnv.LD_PRELOAD
executor的LD_PRELOAD环境变量,开启Native时必须配置。
x86_64选择“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so”。
aarch64 J选择“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so ${JAVA_HOME_21}/lib/libjsig.so”。
无
spark.yarn.appMasterEnv.LD_PRELOAD
为YARN的AM设置环境变量LD_PRELOAD,开启Native时必须配置。
x86_64选择“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so”。
aarch64 J选择“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so ${JAVA_HOME_21}/lib/libjsig.so”。
无
spark.shuffle.manager
Shuffle管理器。设置为“org.apache.spark.shuffle.sort.ColumnarShuffleManager”。
sort
spark.sql.orc.impl
当需要开启Native读写ORC表时,需要设置为native。
hive
- 单击“保存”,保存已经修改的配置,并重启过期的实例。
- 选择“概览 > 更多 > 下载客户端”,安装并使用该客户端。
客户端配置参数
- 如果已经按照服务端配置参数配置,并下载安装新的客户端,可跳过该章节。
- 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行设置,修改如下参数:
参数
说明
默认值
spark.plugins
Spark用到的插件,参数值设置为org.apache.gluten.GlutenPlugin。
说明:如果已经配置了spark.plugins,则可以将org.apache.gluten.GlutenPlugin加到其中,用逗号","隔开。
空
spark.gluten.memory.unify.enabled
是否开启Spark Native统一内存管理。设置为true,Native加速需要使用统一内存管理。
true
spark.executorEnv.LD_PRELOAD
executor的LD_PRELOAD环境变量,开启Native时必须配置。
x86_64配置“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so”。
aarch64配置“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so ${JAVA_HOME_21}/lib/libjsig.so”。
无
spark.yarn.appMasterEnv.LD_PRELOAD
为YARN的AM设置环境变量LD_PRELOAD,开启Native时必须配置。
spark-submit命令设置了“--deploy-mode cluster”时,需要配置。与spark.executorEnv.LD_PRELOAD的值保持一致即可。
x86_64配置“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so”。
aarch64配置“${PWD}/native/${CPU_TYPE}/libch.so ${PWD}/native/${CPU_TYPE}/lib_gsasl.so ${PWD}/native/${CPU_TYPE}/lib_lemmagen.so ${JAVA_HOME_21}/lib/libjsig.so”。
无
spark.shuffle.manager
Shuffle管理器。设置为org.apache.spark.shuffle.sort.ColumnarShuffleManager
sort
spark.sql.orc.impl
当需要开启Native读写ORC表时,需要设置为native。
hive
- 在客户端目录下执行“source bigdata_env”刷新环境变量
- MRS reader对接MemArtscc需要在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行设置,新增如下参数:
需要在集群中安装MemArtscc服务,Spark配置对接MemArtsCC,并且开启Native后,当spark.gluten.sql.columnar.backend.ch.mrs.reader=true时,Spark Native对接MemArtsCC生效。
表1 参数说明 参数
说明
默认值
spark.gluten.sql.columnar.backend.ch.memartscc.enable
Spark native的MemArtsCC客户端开关。
false
fs.obs.memartscc.config.init_ip
MemArtsCC worker服务端IP(仅支持配置单个IP)。
127.0.0.1
fs.obs.memartscc.config.init_port
MemArtsCC worker服务端端口。
21840
fs.obs.memartscc.config.zk_endpoint
ZooKeeper服务端IP端口列表。
127.0.0.1:2181
fs.obs.memartscc.config.zk_root_node
MemArtsCC在ZooKeeper的根节点路径。
/memartccnew
fs.obs.memartscc.config.zk_cluster_name
MemArtsCC在ZooKeeper注册的集群名。
cc01
fs.obs.memartscc.config.log_level
MemArtsCC客户端日志级别。
INFO
fs.obs.memartscc.config.client_type
MemArtsCC客户端类型。
obsa
使用说明
可通过检查执行计划的方式验证Native Engine是否开启,执行如下语句:
spark-sql -e "explain select * from database.data_source"
当执行计划中出现CHNativeColumnarToRow字样,即可认为Native Engine已经开启。

数据一致性校验工具
数据一致性验证工具基于Shell脚本,通过调用Spark-shell启动Scala应用程序。该Scala程序实现了核心的比较逻辑,对比Spark执行查询和通过Spark Native方式执行查询时生成的表数据。Spark和Spark Native的查询结果
- 按照客户端配置参数开启Spark Native。
- 进入“{客户端安装目录}/Spark/spark/tool”目录,执行sh md5compare.sh --help可查看使用方法:
md5compare.sh --dbname <name> --query-path <path> --spark_tbl_location <path> --native_tbl_location <path> --spark_tbl_name <name> --native_tbl_name <name> --clear_tbl_data <boolean> --driver_mem <MEM> --executor_mem <MEM> --executor_num <NUM> --executor_core <NUM>
表2 参数说明 参数
是否必填
参数说明
--dbname <数据库名>
必填
执行 SQL 查询时使用的目标数据库
--query-path < 文件路径 >
必填
SQL 查询脚本的路径,该文件仅支持单条SQL
--spark_tbl_location < 目录路径 >
可选
Spark 查询结果的外部表存储目录,默认值为 /tmp/spark
--native_tbl_location < 目录路径 >
可选
原生查询结果的外部表存储目录,默认值为 /tmp/native
--spark_tbl_name < 表名 >
可选
存储 Spark 查询结果的临时表名称,默认值为 spark_data
--native_tbl_name < 表名 >
可选
存储原生查询结果的临时表名称,默认值为 native_data
--clear_tbl_data < 布尔值 >
可选
对比结束后是否删除临时表及外部表存储路径,默认值为 true
--driver_mem < 内存值 >
可选
Driver 进程使用的内存大小,默认值为 4G
--executor_mem < 内存值 >
可选
每个 Executor 进程使用的内存大小,默认值为 2G
--executor_num < 数值 >
可选
启动的 Executor 进程总数量,默认值为 4
--executor_core < 数值 >
可选
为每个 Executor 进程分配的 CPU 核心数,默认值为 1
- 执行示例:
sh md5compare.sh --dbname tpcds_hive_spark2x_2 --query-path /opt/query/q1.sql --spark_tbl_location /tmp/spark --native_tbl_location /tmp/native --spark_tbl_name spark_data --native_tbl_name native_data --clear_tbl_data true --driver_mem 8G --executor_mem 4G --executor_num 2 --executor_core 2
针对Spark和Native Engine在处理decimal,float以及double时可能存在的小数部分的差异,一致性校验工具统一取到小数点后一位做比较。例如:3.14159,一致性工具在比较时取3.1做比较。
差异说明
开启native engine后,执行结果和spark runtime存在细微差异:
- 由于spark runtime和native engine在处理decimal的舍入规则不同,造成decimal结果最后一位可能存在差异。
- 读取json格式的binary类型数据时,由于Spark在写入时进行了base64编码:如 '0101' 写入json文件后为'MDEwMQ==',Spark runtime读取会base64解码,但native engine读取时未base64解码,需要unbase64()函数解码。
- Native Engine不支持SET TIME ZONE INTERVAL语法。
常见问题
- 服务端配置: 登录Manager页面,选择“集群 > 服务 > Spark > 配置”,单击“全部配置”,选择“Spark(服务) > Native”,配置如下参数:
参数
说明
默认值
spark.gluten.sql.columnar.backend.ch.shuffle.preferSpill
当shuffle内存达到门限时,是否溢写到磁盘
修改配置为true
false
spark.gluten.sql.columnar.backend.ch.spillThreshold
shuffle内存门限值, 当shuffle内存超过该值,会溢写磁盘
建议设置为128M
0
spark.gluten.sql.columnar.maxBatchSize
shuffle算子一个批次处理的行数, 影响Spark Native引擎中shuffle算子的内存占用
可设置为1024
4096
- 客户端配置:
在Spark客户端节点的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行设置,新增如下参数:
spark.gluten.sql.columnar.backend.ch.shuffle.preferSpill=true spark.gluten.sql.columnar.backend.ch.spillThreshold=128M spark.gluten.sql.columnar.maxBatchSize=1024
问题二:执行SQL任务时,如果Fallback Node过多,性能比Spark更差,可考虑使用动态开关关闭Spark native引擎。
set spark.gluten.enabled=false;
调优指南
- 文件被切分成多个分片后,最大分片的大小(默认64MB):
spark.gluten.sql.columnar.backend.ch.file.split.size=268435456
- 内存超过executor限制是否抛出异常(设置为false可允许native超限使用内存以提升性能,但有被OOM killer结束executor进程的风险)。
spark.gluten.sql.columnar.backend.ch.throwIfMemoryExceed = false
- 是否允许var_sample, corvar_sample函数的精度损失:
spark.gluten.sql.columnar.precision.loss.allowed = true
- 是否开启decimal溢出检查,关闭可提升decimal计算性能:
spark.gluten.sql.columnar.backend.ch.runtime_settings.decimal_check_overflow=false
即时编译
spark.gluten.sql.columnar.backend.ch.runtime_config.compile_expressions=1