SQL Optimization for Multi-level Nesting and Hybrid Join
Scenario
This section describes the optimization suggestions for SQL statements in multi-level nesting and hybrid join scenarios.
Prerequisites
The following provides an example of complex query statements:
elect s_name, count(1) as numwait from ( select s_name from ( select s_name, t2.l_orderkey, l_suppkey, count_suppkey, max_suppkey from test2 t2 right outer join ( select s_name, l_orderkey, l_suppkey from ( select s_name, t1.l_orderkey, l_suppkey, count_suppkey, max_suppkey from test1 t1 join ( select s_name, l_orderkey, l_suppkey from orders o join ( select s_name, l_orderkey, l_suppkey from nation n join supplier s on s.s_nationkey = n.n_nationkey and n.n_name = 'SAUDI ARABIA' join lineitem l on s.s_suppkey = l.l_suppkey where l.l_receiptdate > l.l_commitdate and l.l_orderkey is not null ) l1 on o.o_orderkey = l1.l_orderkey and o.o_orderstatus = 'F' ) l2 on l2.l_orderkey = t1.l_orderkey ) a where (count_suppkey > 1) or ((count_suppkey=1) and (l_suppkey <> max_suppkey)) ) l3 on l3.l_orderkey = t2.l_orderkey ) b where (count_suppkey is null) or ((count_suppkey=1) and (l_suppkey = max_suppkey)) ) c group by s_name order by numwait desc, s_name limit 100;
Procedure
- Analyze business.
Analyze business to determine whether SQL statements can be simplified through measures, for example, by combining tables to reduce the number of nesting levels and join times.
- If the SQL statements cannot be simplified, configure the driver memory.
- During execution of SQL statements, specify the driver-memory parameter. An example of SQL statements is as follows:
/spark-sql --master=local[4] --driver-memory=512M -f /tpch.sql
- Before running SQL statements, change the memory size as the administrator.
- Log in to FusionInsight Manager and choose Cluster > Name of the desired cluster > Services > Spark2x > Configurations.
- On the displayed page, click All Configurations and search for SPARK_DRIVER_MEMORY.
- Modify the SPARK_DRIVER_MEMORY parameter value to increase the memory size. The parameter value consists of two parts: memory size (an integer) and the unit (M or G), for example, 512M.
Reference
In the event of insufficient driver memory, the following error may be displayed during the query:
2018-02-11 09:13:14,683 | WARN | Executor task launch worker for task 5 | Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. | org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.spill(RowBasedKeyValueBatch.java:173) 2018-02-11 09:13:14,682 | WARN | Executor task launch worker for task 3 | Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0. | org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.spill(RowBasedKeyValueBatch.java:173) 2018-02-11 09:13:14,704 | ERROR | Executor task launch worker for task 2 | Exception in task 2.0 in stage 1.0 (TID 2) | org.apache.spark.internal.Logging$class.logError(Logging.scala:91) java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:100) at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:791) at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:208) at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:223) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.<init>(UnsafeFixedWidthAggregationMap.java:104) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.createHashMap(HashAggregateExec.scala:307) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:381) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot