如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令

以ANALYZE为例描述

ANALYZE在Hive中的使用方法详见:https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables

 

ANALYZE在Hive中使用简单介绍

一张表有4个分区:

Partition1: (ds=2008-04-08, hr=11)
Partition2: (ds=2008-04-08, hr=12)
Partition3: (ds=2008-04-09, hr=11)
Partition4: (ds=2008-04-09, hr=12)

 

ANALYZE TABLE Table1 PARTITION(ds=2008-04-09, hr=11) COMPUTE STATISTICS;

结果是:partition3 (ds=‘2008-04-09‘, hr=11) 

 

ANALYZE TABLE Table1 PARTITION(ds=2008-04-09, hr) COMPUTE STATISTICS;

结果是: partitions 3和4

 

ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS;

结果是: 全部的4个partitions

 

ANALYZE TABLE Table1 COMPUTE STATISTICS;

对于非分区表可以使用如上命令。

注意:如果Table1是分区表,在使用ANALYZE是必须要指定分区,否则Semantic Analyzer会报错。

 

如何在SparkSQL中添加代码支持Hive中的ANALYZE功能

红色代码是为ANALYZE新添加的功能

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

private[hive] case class AnalyzeTable(tableName: String) extends Command

private[hive] object HiveQl {
  val tree = getAst(sql)
  if (nativeCommands contains tree.getText) {
    NativeCommand(sql)
  } else {
    ifExists) =>
    val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
    DropTable(tableName, ifExists.nonEmpty)
    
    // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"     case Token("TOK_ANALYZE", Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: isNoscan) =>

    // Reference: https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables     if (partitionSpec.nonEmpty) {   // Analyze partitions will be treated as a Hive native command.   NativePlaceholder     } else if (isNoscan.isEmpty) {   // If users do not specify "noscan", it will be treated as a Hive native command.   NativePlaceholder     } else {   val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")       AnalyzeTable(tableName)
    } }

 

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case describe: logical.DescribeCommand =>
    val resolvedTable = context.executePlan(describe.table).analyzed
    resolvedTable match {

 

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

/**
 * :: DeveloperApi ::
 * Analyzes the given table in the current database to generate statistics, which will be
 * used in query optimizations.
 *
 * Right now, it only supports Hive tables and it only updates the size of a Hive table
 * in the Hive metastore.
 */
@DeveloperApi
case class AnalyzeTable(tableName: String) extends LeafNode with Command {

  def hiveContext = sqlContext.asInstanceOf[HiveContext]

  def output = Seq.empty

  override protected[sql] lazy val sideEffectResult = {
    hiveContext.analyze(tableName)
    Seq.empty[Any]
  }

  override def execute(): RDD[Row] = {
    sideEffectResult
    sparkContext.emptyRDD[Row]
  }
}

 

添加ANALYZE后的SparkSQL使用方法:

ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan

 

如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令,古老的榕树,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。