环境与版本
- OS:centos 7
- JDK版本:1.8
- Spark版本:2.1.0
- Scala版本:2.11
- IDE:intellij idea 14.1.4
WholeStageCodeGen简介
Spark2.0集成了第二代Tungsten engine,经过我们的测试,性能相对spark1.6有明显的提升,而其中一个重要的特性就是WholeStageCodeGen,在databricks的官博上有详细讲解这个新特性的文章:
https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
简而言之,利用WholeStageCodeGen技术,可以将一次计算过程中的多个operators作为一个整体,生成与手写代码性能相近的代码。
源码学习
示例
先通过一个简单的示例,用远程调试的方式对整个执行步骤进行跟踪,并关注其中与WholeStageCodeGen相关的部分。
示例代码为:
demo_table是通过读取一个parq文件并注册的临时表,数据包含两列:a和b,如果用explain打印出执行计划:
|
|
执行过程
由于整个执行过程是由DataFrame(DataSet)触发的,所以直奔主题,直接查看DataSet类的collect方法,源码如下:
从这里发现DataSet将真正的collect操作交给了queryExecution.executedPlan处理(如果再深入分析QueryExecution类的源码,会发现这里涉及了一大堆的lazy成员,此处对queryExecution.executedPlan的引用直接触发了物理执行计划的生成,这里仅将物理执行计划的生成过程看成一个黑盒,将重点放在代码生成部分),queryExecution.executedPlan是一个SparkPlan的具体子类的对象,顺着SparkPlan的executeCollect方法再跟下去:
该方法也很清楚,再顺着getByteArrayRdd方法看下去:
该方法主要工作是把UnsafeRows转成byte数组并压缩,唯一可能和WholeStageCodeGen有关系的只有可能是对execute()方法的调用,于是再进入execute():
|
|
该方法只是单纯调用SparkPlan具体子类的doExecute()方法。那么问题就来了,这里调用的是哪个子类的doExecute()方法?换句话说上面提到的queryExecution.executedPlan是什么类型?通过调试,发现queryExecution.executedPlan是WholeStageCodeGenExec类型的对象,该类型扩展了UnaryExecNode特质,包含child对象,整个物理执行计划的对象树如下:
|
|
对照上面用explain打印出来的执行计划,发现整个执行计划被WholeStageCodeGenExec“包装”了,继续看WholeStageCodeGenExec的doExecute()方法:
|
|
WholeStageCodeGenExec的doExecute()方法顺序做了这么几件事:
- 调用doCodeGen()生成代码以及相关上下文信息(具体生成代码的步骤先忽略,下面再说明)
- 尝试编译生成的代码(从CodeGenerator的源码可以看出,编译使用的是Janino库),如果编译失败,则回退到不使用WholeStageCodeGenExec的“传统流程”。
- 真正编译代码并通过反射实例化一个GeneratedClass类型的对象(由于CodeGenerator内部有cache,实际的编译和实例化只会进行一次),通过GeneratedClass的generate方法得到继承自BufferedRowIterator的对象,并利用该对象完成实际的执行流程。
如果将cleanedSource打印出来:
从生成的代码中可以看出generate方法返回了继承自BufferedRowIterator的GeneratedIterator对象,GeneratedIterator类重写了processNext方法,如果仔细阅读这部分代码,会发现该方法中完成了Filter和Project两个步骤的工作,实现了WholeStageCodegenExec类在其注释中说明的——“compile a subtree of plans that support codegen together into single Java function”。
doCodeGen()
前面在看到WholeStageCodeGenExec的doExecute()时,忽略了doCodeGen()方法的细节,这里再结合示例深入看下代码生成的具体实现。该方法代码如下:
从WholeStageCodeGenExec的角度说明doCodeGen的处理流程就是:编写了代码框架(对应代码中的source变量),具体的处理逻辑由child对象的doProduce()方法生成,由于ProjectExec和FilterExec的doProduce()的实现都是简单地将生成代码的工作委托给其child对象完成,所有最终核心逻辑代码其实是由FileSourceScanExec得doProduce()生成的。