Spark SQL External Data Sources JDBC简易实现
在spark1.2版本中最令我期待的功能是External Data Sources,通过该API可以直接将External Data Sources注册成一个临时表,该表可以和已经存在的表等通过sql进行查询操作。External Data Sources API代码存放于org.apache.spark.sql包中。
具体的分析可参见OopsOutOfMemory的两篇精彩博文:
http://blog.csdn.net/oopsoom/article/details/42061077
http://blog.csdn.net/oopsoom/article/details/42064075
自己尝试实现了一个简易的读取关系型数据库的外部数据源,代码参见:https://github.com/luogankun/spark-jdbc
支持MySQL/Oracle/DB2,以及几种简单的数据类型,暂时还不支持PrunedScan、PrunedFilteredScan,仅支持TableScan,后续在接着完善。
使用步骤:
1、编译spark-jdbc代码
sbt package
2、添加jar包到spark-env.sh
export SPARK_CLASSPATH=/home/spark/software/source/spark_package/spark-jdbc/target/scala-2.10/spark-jdbc_2.10-0.1.jar:$SPARK_CLASSPATH export SPARK_CLASSPATH=/home/spark/lib/ojdbc14-10.2.0.3.jar:$SPARK_CLASSPATH export SPARK_CLASSPATH=/home/spark/lib/db2jcc-9.7.jar:$SPARK_CLASSPATH export SPARK_CLASSPATH=/home/spark/lib/mysql-connector-java-3.0.10.jar:$SPARK_CLASSPATH
3、启动spark-sql
CREATE TEMPORARY TABLE jdbc_table USING com.luogankun.spark.jdbc OPTIONS ( sparksql_table_schema ‘(TBL_ID int, TBL_NAME string, TBL_TYPE string)‘, jdbc_table_name ‘TBLS‘, jdbc_table_schema ‘(TBL_ID , TBL_NAME , TBL_TYPE)‘, url ‘jdbc:mysql://hadoop000:3306/hive‘, user ‘root‘, password ‘root‘, num_partitions ‘6‘, where "TBL_ID > 766 AND TBL_NAME=‘order_created_4_partition‘" );
参数说明:
sparksql_table_schema:spark sql表字段名称与类型
jdbc_table_name:关系型数据库表名
jdbc_table_schema: 关系型数据库表字段名称
url :关系型数据库url
user :关系型数据库用户名
password: 关系型数据库密码
num_partitions:partitions数目,默认是5,可省略
where:过滤条件,可省略
select TBL_ID,TBL_NAME,TBL_TYPE from jdbc_table;
在测试过程中遇到的问题:
如上的代码在连接MySQL数据库操作时没有问题,但是在操作Oracle或者DB2数据库时,报错如下:
09:56:48,302 [Executor task launch worker-0] ERROR Logging$class : Error in TaskCompletionListener java.lang.AbstractMethodError: oracle.jdbc.driver.OracleResultSetImpl.isClosed()Z at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:99) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71) at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:85) at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:110) at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:108) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:108) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 09:56:48,302 [Executor task launch worker-1] ERROR Logging$class : Error in TaskCompletionListener
跟了下JdbcRDD源代码发现,问题在于:
我在本案例中使用的oracle的驱动是ojdbc14-10.2.0.3.jar,查阅了些资料说是Oracle的实现类没有该方法;
该issues详见: https://issues.apache.org/jira/browse/SPARK-5239
解决办法:
1、升级驱动包;
2、暂时屏蔽掉这两个isClosed的判断方法(https://github.com/apache/spark/pull/4033)
后续将会继续完善实现PrunedScan、PrunedFilteredScan,现在的实现确实很“丑陋”,凑合着先能使用吧。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。