SparkSQL之旅

1.准备数据employee.txt

1001,Gong Shaocheng,1
1002,Li Dachao,1
1003,Qiu Xin,1
1004,Cheng Jiangzhong,2
1005,Wo Binggang,3

将数据放入hdfs

[root@jfp3-1 spark-studio]# hdfs dfs -put employee.txt /user/spark_studio

 

2.启动spark shell

[root@jfp3-1 spark-1.0.0-bin-hadoop2]# ./bin/spark-shell --master spark://192.168.0.71:7077

3.编写脚本

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

case class Employee(employeeId: Int, name: String, departmentId: Int)

// Create an RDD of Employee objects and register it as a table.
val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt))
employees.registerAsTable("employee")

// SQL statements can be run by using the sql methods provided by sqlContext.
val fsis = sql("SELECT name FROM employee WHERE departmentId = 1")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
fsis.map(t => "Name: " + t(0)).collect().foreach(println)

4.运行

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@17124319

scala> import sqlContext._
import sqlContext._

scala> case class Employee(employeeId: String, name: String, departmentId: Int)
defined class Employee

scala> val employees = sc.textFile("hdfs://jfp3-1:8020/user/spark_studio/employee.txt").map(_.split(",")).map(p => Employee(p(0), p(1), p(2).trim.toInt))
14/06/18 09:54:25 INFO MemoryStore: ensureFreeSpace(138763) called with curMem=0, maxMem=309225062
14/06/18 09:54:25 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 135.5 KB, free 294.8 MB)
employees: org.apache.spark.rdd.RDD[Employee] = MappedRDD[3] at map at <console>:19

scala> employees.registerAsTable("employee")

scala> val fsis = sql("SELECT name FROM employee WHERE departmentId = 1")
14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/06/18 09:54:44 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
14/06/18 09:54:44 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
fsis: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [name#1:1]
 Filter (departmentId#2:2 = 1)
  ExistingRdd [employeeId#0,name#1,departmentId#2], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174

scala> fsis.map(t => "Name: " + t(0)).collect().foreach(println)
14/06/18 09:55:27 INFO FileInputFormat: Total input paths to process : 1
14/06/18 09:55:27 INFO SparkContext: Starting job: collect at <console>:20
14/06/18 09:55:27 INFO DAGScheduler: Got job 0 (collect at <console>:20) with 2 output partitions (allowLocal=false)
14/06/18 09:55:27 INFO DAGScheduler: Final stage: Stage 0(collect at <console>:20)
14/06/18 09:55:27 INFO DAGScheduler: Parents of final stage: List()
14/06/18 09:55:27 INFO DAGScheduler: Missing parents: List()
14/06/18 09:55:27 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[9] at map at <console>:20), which has no missing parents
14/06/18 09:55:27 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[9] at map at <console>:20)
14/06/18 09:55:27 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 1: jfp3-2 (NODE_LOCAL)
14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:0 as 3508 bytes in 2 ms
14/06/18 09:55:27 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 2: jfp3-3 (NODE_LOCAL)
14/06/18 09:55:27 INFO TaskSetManager: Serialized task 0.0:1 as 3508 bytes in 0 ms
14/06/18 09:55:28 INFO TaskSetManager: Finished TID 1 in 1266 ms on jfp3-3 (progress: 1/2)
14/06/18 09:55:28 INFO TaskSetManager: Finished TID 0 in 1276 ms on jfp3-2 (progress: 2/2)
14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 1)
14/06/18 09:55:28 INFO DAGScheduler: Completed ResultTask(0, 0)
14/06/18 09:55:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/06/18 09:55:28 INFO DAGScheduler: Stage 0 (collect at <console>:20) finished in 1.284 s
14/06/18 09:55:28 INFO SparkContext: Job finished: collect at <console>:20, took 1.386154401 s
Name: Gong Shaocheng Name: Li Dachao Name: Qiu Xin

 5.将数据存为parquet格式,并运行sql

scala> val parquetFile = sqlContext.parquetFile("hdfs://jfp3-1:8020/user/spark_studio/employee.parquet")
14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/06/18 10:24:36 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
14/06/18 10:24:36 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
parquetFile: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[13] at RDD at SchemaRDD.scala:98
== Query Plan ==
ParquetTableScan [employeeId#9,name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None


scala> parquetFile.registerAsTable("parquetFile")


scala> val telcos = sql("SELECT name FROM parquetFile WHERE departmentId = 3")
14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/06/18 10:24:37 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
14/06/18 10:24:37 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
14/06/18 10:24:37 INFO MemoryStore: ensureFreeSpace(180579) called with curMem=138763, maxMem=309225062
14/06/18 10:24:37 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 176.3 KB, free 294.6 MB)
telcos: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[14] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [name#10:0]
 Filter (departmentId#11:1 = 3)
  ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None

scala> telcos.collect().foreach(println)
14/06/18 10:24:40 INFO FileInputFormat: Total input paths to process : 2
14/06/18 10:24:40 INFO ParquetInputFormat: Total input paths to process : 2
14/06/18 10:24:40 INFO ParquetFileReader: reading summary file: hdfs://jfp3-1:8020/user/spark_studio/employee.parquet/_metadata
14/06/18 10:24:40 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
14/06/18 10:24:40 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
14/06/18 10:24:40 INFO SparkContext: Starting job: collect at <console>:20
14/06/18 10:24:40 INFO DAGScheduler: Got job 2 (collect at <console>:20) with 2 output partitions (allowLocal=false)
14/06/18 10:24:40 INFO DAGScheduler: Final stage: Stage 2(collect at <console>:20)
14/06/18 10:24:40 INFO DAGScheduler: Parents of final stage: List()
14/06/18 10:24:40 INFO DAGScheduler: Missing parents: List()
14/06/18 10:24:40 INFO DAGScheduler: Submitting Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [name#10:0]
 Filter (departmentId#11:1 = 3)
  ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None), which has no missing parents
14/06/18 10:24:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (SchemaRDD[14] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [name#10:0]
 Filter (departmentId#11:1 = 3)
  ParquetTableScan [name#10,departmentId#11], (ParquetRelation hdfs://jfp3-1:8020/user/spark_studio/employee.parquet), None)
14/06/18 10:24:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:0 as TID 4 on executor 2: jfp3-3 (NODE_LOCAL)
14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:0 as 3116 bytes in 1 ms
14/06/18 10:24:40 INFO TaskSetManager: Starting task 2.0:1 as TID 5 on executor 0: jfp3-4 (NODE_LOCAL)
14/06/18 10:24:40 INFO TaskSetManager: Serialized task 2.0:1 as 3116 bytes in 1 ms
14/06/18 10:24:40 INFO DAGScheduler: Completed ResultTask(2, 0)
14/06/18 10:24:40 INFO TaskSetManager: Finished TID 4 in 200 ms on jfp3-3 (progress: 1/2)
14/06/18 10:24:42 INFO DAGScheduler: Completed ResultTask(2, 1)
14/06/18 10:24:42 INFO TaskSetManager: Finished TID 5 in 2162 ms on jfp3-4 (progress: 2/2)
14/06/18 10:24:42 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/06/18 10:24:42 INFO DAGScheduler: Stage 2 (collect at <console>:20) finished in 2.177 s
14/06/18 10:24:42 INFO SparkContext: Job finished: collect at <console>:20, took 2.210887848 s
[Wo Binggang]

 

 6. DSL syntax支持

scala> all.collect().foreach(println)
14/06/18 10:37:45 INFO SparkContext: Starting job: collect at <console>:24
14/06/18 10:37:45 INFO DAGScheduler: Got job 6 (collect at <console>:24) with 2 output partitions (allowLocal=false)
14/06/18 10:37:45 INFO DAGScheduler: Final stage: Stage 6(collect at <console>:24)
14/06/18 10:37:45 INFO DAGScheduler: Parents of final stage: List()
14/06/18 10:37:45 INFO DAGScheduler: Missing parents: List()
14/06/18 10:37:45 INFO DAGScheduler: Submitting Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [name#19:1]
 Filter (departmentId#20:2 >= 1)
  ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174), which has no missing parents
14/06/18 10:37:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 6 (SchemaRDD[33] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [name#19:1]
 Filter (departmentId#20:2 >= 1)
  ExistingRdd [employeeId#18,name#19,departmentId#20], MapPartitionsRDD[30] at mapPartitions at basicOperators.scala:174)
14/06/18 10:37:45 INFO TaskSchedulerImpl: Adding task set 6.0 with 2 tasks
14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:0 as TID 200 on executor 2: jfp3-3 (NODE_LOCAL)
14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:0 as 3541 bytes in 0 ms
14/06/18 10:37:45 INFO TaskSetManager: Starting task 6.0:1 as TID 201 on executor 1: jfp3-2 (NODE_LOCAL)
14/06/18 10:37:45 INFO TaskSetManager: Serialized task 6.0:1 as 3541 bytes in 1 ms
14/06/18 10:37:45 INFO TaskSetManager: Finished TID 200 in 33 ms on jfp3-3 (progress: 1/2)
14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 0)
14/06/18 10:37:45 INFO DAGScheduler: Completed ResultTask(6, 1)
14/06/18 10:37:45 INFO TaskSetManager: Finished TID 201 in 37 ms on jfp3-2 (progress: 2/2)
14/06/18 10:37:45 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
14/06/18 10:37:45 INFO DAGScheduler: Stage 6 (collect at <console>:24) finished in 0.039 s
14/06/18 10:37:45 INFO SparkContext: Job finished: collect at <console>:24, took 0.052556716 s
[Gong Shaocheng]
[Li Dachao]
[Qiu Xin]
[Cheng Jiangzhong]
[Wo Binggang]

 

SparkSQL之旅,古老的榕树,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。