Spark SQL | 目前Spark社区最活跃的组件之一

Spark SQL是一个用来处理结构化数据的Spark组件,前身是shark,但是shark过多的依赖于hive如采用hive的语法解析器、查询优化器等,制约了Spark各个组件之间的相互集成,因此Spark SQL应运而生。

Spark SQL在汲取了shark诸多优势如内存列存储、兼容hive等基础上,做了重新的构造,因此也摆脱了对hive的依赖,但同时兼容hive。除了采取内存列存储优化性能,还引入了字节码生成技术、CBO和RBO对查询等进行动态评估获取最优逻辑计划、物理计划执行等。基于这些优化,使得Spark SQL相对于原有的SQL on Hadoop技术在性能方面得到有效提升。

同时,Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

DataSet/DataFrame

DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。

DataSet是自Spark1.6开始提供的一个分布式数据集,具有RDD的特性比如强类型、可以使用强大的lambda表达式,并且使用Spark SQL的优化执行引擎。DataSet API支持Scala和Java语言,不支持Python。但是鉴于Python的动态特性,它仍然能够受益于DataSet API(如,你可以通过一个列名从Row里获取这个字段 row.columnName),类似的还有R语言。

DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:

type DataFrame = Dataset[Row]。

DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。

 

DataSet创建

DataSet通常通过加载外部数据或通过RDD转化创建。

1. 加载外部数据

以加载json和mysql为例:

val ds = sparkSession.read.json("/路径/people.json")

val ds = sparkSession.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://ip:port/db","driver" -> "com.mysql.jdbc.Driver","dbtable" -> "tableName","user" -> "root","root" -> "123")).load()

 

2. RDD转换为DataSet通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码):

1.定义一个case class,利用反射机制来推断

1) 从HDFS中加载文件为普通RDD
val lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))

2) 定义case class(相当于表的schema)
case class Person(id:Int,name:String,age:Int)

3) 将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))

4) 将RDD转换成DataFrame
val ds= personRDD.toDF

2.手动定义一个schema StructType,直接指定在RDD上

val schemaString ="name age"

val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))

val ds = sparkSession.createDataFrame(rowRdd,schema)

操作DataSet的两种风格语法

DSL语法 

1. 查询DataSet部分列中的内容

personDS.select(col("name"))

personDS.select(col("name"),col("age"))

2. 查询所有的name和age和salary,并将salary加1000

personDS.select(col("name"),col("age"),col("salary") + 1000)

personDS.select(personDS("name"),personDS("age"),personDS("salary") + 1000)

3. 过滤age大于18的personDS.filter(col("age") > 18)

4. 按年龄进行分组并统计相同年龄的人数

personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL语法

如果想使用SQL风格的语法,需要将DataSet注册成表personDS.registerTempTable("person")

//查询年龄最大的前两名

val result = sparkSession.sql("select * from person order by age desc limit 2")/

/保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet
result.write.format("json").save("hdfs://ip:port/res2")

 

Spark SQL的几种使用方式

1. sparksql-shell交互式查询

就是利用Spark提供的shell命令行执行SQL

2. 编程

首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:

val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();

val df = sparkSession.read.format("parquet").load("/路径/parquet文件")

然后就可以针对df进行业务处理了。 

3. Thriftserverbeeline客户端连接操作

启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。

hive-jdbc驱动包来访问spark-sql的thrift服务

在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:

Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://ip:port","root","123");
try {
  val stat = conn.createStatement()
  val res = stat.executeQuery("select * from people limit 1")
  while (res.next()) {
    println(res.getString("name"))
  }
} catch {
  case e: Exception => e.printStackTrace()
} finally{
  if(conn!=null) conn.close()
}

 

Spark SQL 获取Hive数据

Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式:
首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:

<property>

<name>hive.metastore.uris</name>

<value>thrift://ip:port</value>

</property>

 

然后,启动hive metastore

最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:

val spark = SparkSession.builder()

.config(sparkConf).enableHiveSupport().getOrCreate()

 

UDF、UDAF、Aggregator

UDF

UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例:

val udf_str_length = udf{(str:String) => str.length}
spark.udf.register("str_length",udf_str_length)
val ds =sparkSession.read.json("路径/people.json")
ds.createOrReplaceTempView("people")
sparkSession.sql("select str_length(address) from people")

 

UDAF

定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例:

import org.apache.spark.sql.{Row,SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
  // Data types of input arguments of this aggregate function
  def inputSchema: StructType = StructType(StructField("inputColumn",LongType) :: Nil)
  // Data types of values in the aggregation buffer
  def bufferSchema: StructType = {
    StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
  }
  // The data type of the returned value
  def dataType: DataType = DoubleType
  // Whether this function always returns the same output on the identical input
  def deterministic: Boolean = true
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g.,get(),getBoolean()),provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // Calculates the final result
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage",MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

 

Aggregator

import org.apache.spark.sql.{Encoder,Encoders,SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String,salary: Long)
case class Average(var sum: Long,var count: Long)

object MyAverage extends Aggregator[Employee,Average,Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L,0L)
  // Combine two values to produce a new value. For performance,the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average,employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average,b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

 

 

Spark SQL与Hive的对比

 

关注微信公众号:大数据学习与分享,获取更对技术干货

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展API用来支持高吞吐、高容错的处理流式数据数据源可以是:Kafka、TCPsockets、Flume、Twitter等流式数据源处理数据:可以用SparkCore的算子map、reduce、join、window
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“S...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数据分析”,在日常操作中,相信很多人在TSDB的数据怎么利用Hadoop/spark集群做数据分析问题上存在疑惑...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这
小编给大家分享一下Hadoop和Spark有什么不同,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们
这篇文章主要讲解了“Hadoop和Spark的Shuffle过程有什么不同”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习...
本篇文章给大家分享的是有关基于CDP7.1.1的Spark3.0技术预览版本分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获...
这篇文章主要介绍“Spark中foreachRDD、foreachPartition和foreach的区别是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的...
本篇内容主要讲解“spark的动态分区裁剪怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark的动态分...
本篇内容介绍了“spark的动态分区裁剪下物理计划怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下
这篇文章给大家介绍基于Spark和TensorFlow 的机器学习实践是怎么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。EMR E-Learning平台...
这篇文章将为大家详细讲解有关如何进行EMR Spark-SQL性能极致优化的分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识...
如何进行SparkSQL与Hive metastore Parquet转换的分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决...
如何浅析Hive和Spark SQL读文件时的输入任务划分,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个...
这篇文章将为大家详细讲解有关Hive on Spark参数如何调优,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。前言Hive on Spa...
这篇文章将为大家详细讲解有关fs.defaultFS变更使spark-sql查询hive失败是怎么回事,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以...
这篇文章将为大家详细讲解有关怎么解析SparkCore和SparkSQL,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解...
怎么快速搭建Spark开发环境,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一,搭建本...