文档首页/
MapReduce服务 MRS/
用户指南(吉隆坡区域)/
MRS集群组件操作指导/
使用Spark2x/
Spark2x性能调优/
SQL和DataFrame调优/
多级嵌套子查询以及混合Join的SQL调优
更新时间:2023-03-17 GMT+08:00
多级嵌套子查询以及混合Join的SQL调优
操作场景
本章节介绍在多级嵌套以及混合Join SQL查询的调优建议。
前提条件
例如有一个复杂的查询样例如下:
select s_name, count(1) as numwait from ( select s_name from ( select s_name, t2.l_orderkey, l_suppkey, count_suppkey, max_suppkey from test2 t2 right outer join ( select s_name, l_orderkey, l_suppkey from ( select s_name, t1.l_orderkey, l_suppkey, count_suppkey, max_suppkey from test1 t1 join ( select s_name, l_orderkey, l_suppkey from orders o join ( select s_name, l_orderkey, l_suppkey from nation n join supplier s on s.s_nationkey = n.n_nationkey and n.n_name = 'SAUDI ARABIA' join lineitem l on s.s_suppkey = l.l_suppkey where l.l_receiptdate > l.l_commitdate and l.l_orderkey is not null ) l1 on o.o_orderkey = l1.l_orderkey and o.o_orderstatus = 'F' ) l2 on l2.l_orderkey = t1.l_orderkey ) a where (count_suppkey > 1) or ((count_suppkey=1) and (l_suppkey <> max_suppkey)) ) l3 on l3.l_orderkey = t2.l_orderkey ) b where (count_suppkey is null) or ((count_suppkey=1) and (l_suppkey = max_suppkey)) ) c group by s_name order by numwait desc, s_name limit 100;
操作步骤
- 分析业务。
从业务入手分析是否可以简化SQL,例如可以通过合并表去减少嵌套的层级和Join的次数。
- 如果业务需求对应的SQL无法简化,则需要配置DRIVER内存:
- 执行SQL语句时,需要添加参数“--driver-memory”,设置内存大小,例如:
/spark-sql --master=local[4] --driver-memory=512M -f /tpch.sql
- 在执行SQL语句前,请使用管理员用户修改内存大小配置。
- 登录FusionInsight Manager,选择 。
- 单击“全部配置”,并搜索“SPARK_DRIVER_MEMORY”。
- 修改参数值适当增加内存大小。仅支持整数值,且需要输入单位M或者G。例如输入512M。
参考信息
DRIVER内存不足时,查询操作可能遇到以下错误提示信息:
2018-02-11 09:13:14,683 | WARN | Executor task launch worker for task 5 | Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. | org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.spill(RowBasedKeyValueBatch.java:173) 2018-02-11 09:13:14,682 | WARN | Executor task launch worker for task 3 | Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. | org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.spill(RowBasedKeyValueBatch.java:173) 2018-02-11 09:13:14,704 | ERROR | Executor task launch worker for task 2 | Exception in task 2.0 in stage 1.0 (TID 2) | org.apache.spark.internal.Logging$class.logError(Logging.scala:91) java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:100) at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:791) at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:208) at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:223) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.<init>(UnsafeFixedWidthAggregationMap.java:104) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.createHashMap(HashAggregateExec.scala:307) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:381) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
父主题: SQL和DataFrame调优