如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令
以ANALYZE为例描述
ANALYZE在Hive中的使用方法详见:https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
ANALYZE在Hive中使用简单介绍
一张表有4个分区:
Partition1: (ds=‘2008-04-08‘, hr=11) Partition2: (ds=‘2008-04-08‘, hr=12) Partition3: (ds=‘2008-04-09‘, hr=11) Partition4: (ds=‘2008-04-09‘, hr=12)
ANALYZE TABLE Table1 PARTITION(ds=‘2008-04-09‘, hr=11) COMPUTE STATISTICS;
结果是:partition3 (ds=‘2008-04-09‘, hr=11)
ANALYZE TABLE Table1 PARTITION(ds=‘2008-04-09‘, hr) COMPUTE STATISTICS;
结果是: partitions 3和4
ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS;
结果是: 全部的4个partitions
ANALYZE TABLE Table1 COMPUTE STATISTICS;
对于非分区表可以使用如上命令。
注意:如果Table1是分区表,在使用ANALYZE是必须要指定分区,否则Semantic Analyzer会报错。
如何在SparkSQL中添加代码支持Hive中的ANALYZE功能
红色代码是为ANALYZE新添加的功能
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
private[hive] case class AnalyzeTable(tableName: String) extends Command private[hive] object HiveQl { val tree = getAst(sql) if (nativeCommands contains tree.getText) { NativeCommand(sql) } else { ifExists) => val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") DropTable(tableName, ifExists.nonEmpty)
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan" case Token("TOK_ANALYZE", Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: isNoscan) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables if (partitionSpec.nonEmpty) { // Analyze partitions will be treated as a Hive native command. NativePlaceholder } else if (isNoscan.isEmpty) { // If users do not specify "noscan", it will be treated as a Hive native command. NativePlaceholder } else { val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") AnalyzeTable(tableName) } }
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match {
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
/** * :: DeveloperApi :: * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. * * Right now, it only supports Hive tables and it only updates the size of a Hive table * in the Hive metastore. */ @DeveloperApi case class AnalyzeTable(tableName: String) extends LeafNode with Command { def hiveContext = sqlContext.asInstanceOf[HiveContext] def output = Seq.empty override protected[sql] lazy val sideEffectResult = { hiveContext.analyze(tableName) Seq.empty[Any] } override def execute(): RDD[Row] = { sideEffectResult sparkContext.emptyRDD[Row] } }
添加ANALYZE后的SparkSQL使用方法:
ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。