博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark SQL with Hive
阅读量:5063 次
发布时间:2019-06-12

本文共 7184 字,大约阅读时间需要 23 分钟。

    前一篇文章是Spark SQL的入门篇,介绍了一些基础知识和API,可是离我们的日常使用还似乎差了一步之遥。

    终结Shark的利用有2个:

   1、和Spark程序的集成有诸多限制

   2、Hive的优化器不是为Spark而设计的,计算模型的不同,使得Hive的优化器来优化Spark程序遇到了瓶颈。

    这里看一下Spark SQL 的基础架构:

    

    Spark1.1公布后会支持Spark SQL CLI 。 Spark SQL的CLI会要求被连接到一个Hive Thrift Server上,来实现类似hive shell的功能。

(ps:眼下git里面的branch-1.0-jdbc。眼下还没有正式release,我測了一下午。发现还是有bug的,耐心等待release吧!)

    本着研究的心态,想和Hive环境集成一下。在spark shell里运行hive的语句。

一、编译Spark支持Hive

    让Spark支持Hive有2种sbt编译方式:

    1、sbt前加变量名

SPARK_HADOOP_VERSION=0.20.2-cdh3u5 SPARK_HIVE=true sbt/sbt assembly

    2、改动project/SparkBuild.scala文件

val DEFAULT_HADOOP_VERSION = "0.20.2-cdh3u5"val DEFAULT_HIVE = true 然后运行sbt/sbt assembly

二、Spark SQL 操作Hive

前置:hive可用,而且在Spark-env.sh下,须要将Hive的conf和Hadoop的conf配到CLASSPATH里。

启动spark-shell

[root@web01 spark]# bin/spark-shell --master spark://10.1.8.210:7077 --driver-class-path /app/hadoop/hive-0.11.0-bin/lib/mysql-connector-java-5.1.13-bin.jar:/app/hadoop/hive-0.11.0-bin/lib/hadoop-lzo-0.4.15.jar
导入HiveContext

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@7766d31cscala> import hiveContext._import hiveContext._

hiveContext里提供了一个运行sql的函数 hql(string text)

去hive里show databases. 这里Spark会parse hql 然后生成Query Plan。可是这里不会运行查询,仅仅有调用collect的时候才会运行。

scala> val show_databases = hql("show databases")14/07/09 19:59:09 INFO storage.BlockManager: Removing broadcast 014/07/09 19:59:09 INFO storage.BlockManager: Removing block broadcast_014/07/09 19:59:09 INFO parse.ParseDriver: Parsing command: show databases14/07/09 19:59:09 INFO parse.ParseDriver: Parse Completed14/07/09 19:59:09 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations14/07/09 19:59:09 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences14/07/09 19:59:09 INFO analysis.Analyzer: Max iterations (2) reached for batch Check Analysis14/07/09 19:59:09 INFO storage.MemoryStore: Block broadcast_0 of size 393044 dropped from memory (free 308713881)14/07/09 19:59:09 INFO broadcast.HttpBroadcast: Deleted broadcast file: /tmp/spark-c29da0f8-c5e3-4fbf-adff-9aa77f9743b2/broadcast_014/07/09 19:59:09 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange14/07/09 19:59:09 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions14/07/09 19:59:09 INFO spark.ContextCleaner: Cleaned broadcast 014/07/09 19:59:09 INFO ql.Driver: 
14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO exec.ListSinkOperator: 0 finished. closing... 14/07/09 19:59:09 INFO exec.ListSinkOperator: 0 forwarded 0 rows14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO parse.ParseDriver: Parsing command: show databases14/07/09 19:59:09 INFO parse.ParseDriver: Parse Completed14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver: Semantic Analysis Completed14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO exec.ListSinkOperator: Initializing Self 0 OP14/07/09 19:59:09 INFO exec.ListSinkOperator: Operator 0 OP initialized14/07/09 19:59:09 INFO exec.ListSinkOperator: Initialization Done 0 OP14/07/09 19:59:09 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:database_name, type:string, comment:from deserializer)], properties:null)14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver: Starting command: show databases14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO ql.Driver:
14/07/09 19:59:09 INFO metastore.HiveMetaStore: 0: get_all_databases14/07/09 19:59:09 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases14/07/09 19:59:09 INFO exec.DDLTask: results : 114/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO ql.Driver: OK14/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO mapred.FileInputFormat: Total input paths to process : 114/07/09 19:59:10 INFO ql.Driver:
14/07/09 19:59:10 INFO ql.Driver:
show_databases: org.apache.spark.sql.SchemaRDD = SchemaRDD[16] at RDD at SchemaRDD.scala:100== Query Plan ==
运行查询计划:

scala> show_databases.collect()14/07/09 20:00:44 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:5214/07/09 20:00:44 INFO scheduler.DAGScheduler: Got job 2 (collect at SparkPlan.scala:52) with 1 output partitions (allowLocal=false)14/07/09 20:00:44 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at SparkPlan.scala:52)14/07/09 20:00:44 INFO scheduler.DAGScheduler: Parents of final stage: List()14/07/09 20:00:44 INFO scheduler.DAGScheduler: Missing parents: List()14/07/09 20:00:44 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52), which has no missing parents14/07/09 20:00:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52)14/07/09 20:00:44 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks14/07/09 20:00:44 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 9 on executor 0: web01.dw (PROCESS_LOCAL)14/07/09 20:00:44 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 1511 bytes in 0 ms14/07/09 20:00:45 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)14/07/09 20:00:45 INFO scheduler.TaskSetManager: Finished TID 9 in 12 ms on web01.dw (progress: 1/1)14/07/09 20:00:45 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/07/09 20:00:45 INFO scheduler.DAGScheduler: Stage 2 (collect at SparkPlan.scala:52) finished in 0.014 s14/07/09 20:00:45 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.020520428 sres5: Array[org.apache.spark.sql.Row] = Array([default])
返回default数据库。

相同的运行:show tables

scala> hql("show tables").collect()
14/07/09 20:01:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 14/07/09 20:01:28 INFO scheduler.DAGScheduler: Stage 3 (collect at SparkPlan.scala:52) finished in 0.013 s14/07/09 20:01:28 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.019173851 sres7: Array[org.apache.spark.sql.Row] = Array([item], [src])
理论上是支持HIVE全部的操作,包含UDF。

PS:遇到的问题:

Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "BoneCP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.

解决的方法:就是我上面启动的时候带上sql-connector的路径。

三、总结:

Spark SQL 兼容了Hive的大部分语法和UDF,可是在处理查询计划的时候,使用了Catalyst框架进行优化,优化成适合Spark编程模型的运行计划,使得效率上高出hive非常多。

因为Spark1.1临时还未公布。眼下还存在bug,等到稳定版公布了再继续測试了。

全文完:)

原创文章,转载请注明出自:

转载于:https://www.cnblogs.com/lxjshuju/p/7016122.html

你可能感兴趣的文章
BZOJ2049[Sdoi2008]Cave 洞穴勘测(LCT模板)
查看>>
vuex插件
查看>>
2011年12月09日
查看>>
[ZJOI2007]棋盘制作 【最大同色矩形】
查看>>
合并单元格
查看>>
swift-初探webView与JS交互
查看>>
IOS-图片操作集合
查看>>
Android bitmap图片处理
查看>>
Android应用程序进程启动过程的源代码分析
查看>>
adb logcat 命令行用法
查看>>
Redis学习手册(Key操作命令)
查看>>
模板统计LA 4670 Dominating Patterns
查看>>
泛型第23条:请不要在新代码中使用原生态类型
查看>>
非对称加密
查看>>
bzoj 3413: 匹配
查看>>
从下周开始就要采用网上记录值班日志了
查看>>
在qq中可以使用添加标签功能
查看>>
eclipse 自定义布局
查看>>
团队项目开发客户端——登录子系统的设计
查看>>
【AppScan心得】IBM Rational AppScan 无法记录登录序列
查看>>