sparkSQL1.1入门之二:sparkSQL运行架构
SELECT a1,a2,a3 FROM tableA Where condition
可以看得出来,该语句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result-->Data Source-->Operation的次序来描述的。那么,SQL语句在实际的运行过程中是怎么处理的呢?一般的数据库系统先将读入的SQL语句(Query)先进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),这个过程将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),最终执行该计划(Execute),并返回结果。当然在实际的执行过程中,是按Operation-->Data Source-->Result的次序来进行的,和SQL语句的次序刚好相反;在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。
- Tree的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
- Logical Plans、Expressions、Physical Operators都可以使用Tree表示
- Tree的具体操作是通过TreeNode来实现的
- sparkSQL定义了catalyst.trees的日志,通过这个日志可以形象的表示出树的结构
- TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)进行操作
- 有了TreeNode,通过Tree中各个TreeNode之间的关系,可以对Tree进行遍历操作,如使用transformDown、transformUp将Rule应用到给定的树段,然后用结果替代旧的树段;也可以使用transformChildrenDown、transformChildrenUp对一个给定的节点进行操作,通过迭代将Rule应用到该节点以及子节点。
- TreeNode可以细分成三种类型的Node:
- UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作
- BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作
- LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand
- Rule的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
- Rule在sparkSQL的analyzer、optimizer、SparkPlan等各个组件中都有应用到
- Rule是一个抽象类,具体的Rule实现是通过RuleExecutor完成
- Rule通过定义batch和batchs,可以简便的、模块化地对Tree进行transform操作
- Rule通过定义Once和FixedPoint,可以对Tree进行一次操作或多次操作(如对某些Tree进行多次迭代操作的时候,达到FixedPoint次数迭代或达到前后两次的树结构没变化才停止操作,具体参看RuleExecutor.apply)
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */ def sql(sqlText: String): SchemaRDD = { if (dialect == "sql") { new SchemaRDD(this, parseSql(sqlText)) //parseSql(sqlText)对sql语句进行语法解析 } else { sys.error(s"Unsupported SQL dialect: $dialect") } }sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成UnresolvedLogicalPlan。
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */ protected[sql] val parser = new catalyst.SqlParser protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)类SchemaRDD继承自SchemaRDDLike
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala */ class SchemaRDD( @transient val sqlContext: SQLContext, @transient val baseLogicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLikeSchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成UnresolvedLogicalPlan,这里的baseLogicalPlan就是指UnresolvedLogicalPlan。
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala */ private[sql] trait SchemaRDDLike { @transient val sqlContext: SQLContext @transient val baseLogicalPlan: LogicalPlan private[sql] def baseSchemaRDD: SchemaRDD lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */ protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */ protected abstract class QueryExecution { def logical: LogicalPlan //对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan lazy val analyzed = ExtractPythonUdfs(analyzer(logical)) //对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan lazy val optimizedPlan = optimizer(analyzed) // 将optimized LogicalPlan转换成PhysicalPlan lazy val sparkPlan = { SparkPlan.currentContext.set(self) planner(optimizedPlan).next() } // PhysicalPlan执行前的准备工作,生成可执行的物理计划 lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //执行可执行物理计划 lazy val toRdd: RDD[Row] = executedPlan.execute() ...... }sqlContext总的一个过程如下图所示:
- SQL语句经过SqlParse解析成UnresolvedLogicalPlan;
- 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan;
- 使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
- 使用SparkPlan将LogicalPlan转换成PhysicalPlan;
- 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
- 使用execute()执行可执行物理计划;
- 生成SchemaRDD。
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ override def sql(sqlText: String): SchemaRDD = { // 使用spark.sql.dialect定义采用的语法解析器 if (dialect == "sql") { super.sql(sqlText) //如果使用sql解析器,则使用sqlContext的sql方法 } else if (dialect == "hiveql") { //如果使用和hiveql解析器,则使用HiveQl.parseSql new SchemaRDD(this, HiveQl.parseSql(sqlText)) } else { sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'") } }hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect == "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */ /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { try { if (条件) { //非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan ..... } else { val tree = getAst(sql) if (nativeCommands contains tree.getText) { NativeCommand(sql) } else { nodeToPlan(tree) match { case NativePlaceholder => NativeCommand(sql) case other => other } } } } catch { //异常处理 ...... } }因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:
- 首先考虑一些非hive语句的处理,这些命令属于sparkSQL本身的命令语句,如设置sparkSQL运行参数的set命令、cache table、add jar等,将这些语句转换成command类型的LogicalPlan;
- 如果是hive语句,则调用getAst(sql)使用hive的ParseUtils将该语句先解析成AST树,然后根据AST树中的关键字进行转换:类似命令型的语句、DDL类型的语句转换成command类型的LogicalPlan;其他的转换通过nodeToPlan转换成LogicalPlan。
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */ /** * Returns the AST for the given SQL string. */ def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))和sqlContext一样,类SchemaRDD继承自SchemaRDDLike ,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数,
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ protected[sql] abstract class QueryExecution extends super.QueryExecution { // TODO: Create mixin for the analyzer instead of overriding things here. override lazy val optimizedPlan = optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))) override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy()) ...... }所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { override def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan = { LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias)) } }hiveContext的analyzer,使用了新的catalog和functionRegistry:
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ /* An analyzer that uses the Hive metastore. */ @transient override protected[sql] lazy val analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = false)
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */ @transient override protected[sql] val planner = hivePlanner
- SQL语句经过HiveQl.parseSql解析成UnresolvedLogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;
- 使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolvedLogicalPlan;
- 使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;
- 使用hivePlanner将LogicalPlan转换成PhysicalPlan;
- 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
- 使用execute()执行可执行物理计划;
- 执行后,使用map(_.copy)将结果导入SchemaRDD。
- core处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
- catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
- hive对hive数据的处理
- hive-ThriftServer提供CLI和JDBC/ODBC接口
- sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;
- Analyzer,主要完成绑定工作,将不同来源的UnresolvedLogicalPlan 和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolvedLogicalPlan;
- optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
- Planner将LogicalPlan转换成PhysicalPlan;
- CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划
- 先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。
- Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善UnresolvedLogicalPlan的属性而转换成resolvedLogicalPlan;
- optimizer使用Optimization Rules,对resolvedLogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimizedLogicalPlan;
- Planner使用Planning Strategies,对optimizedLogicalPlan
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。