Spark SQL 结构化函数一般都在 functions
模块,要使用这些函数,需要先导入该模块:
1 | import org.apache.spark.sql.functions._ |
普通函数
Spark SQL 函数众多,最好的做法就是当需要某个具体功能时在以下列表中检索,或者直接百度谷歌:
- 字符串函数: Spark SQL String Functions
- 日期时间函数: Spark SQL Date and Time Functions
- 数组函数: Spark SQL Array functions complete list
- 字典函数: Spark SQL Array functions complete list
- 排序函数: Spark SQL Sort functions
- 聚合函数: Spark SQL Aggregate Functions
聚合函数
在聚合中,您将指定一个分组和一个聚合函数,该函数必须为每个分组产生一个结果。Spark 的聚合功能是复杂巧妙且成熟的,具有各种不同的用例和可能性。通常,通过分组使用聚合函数去汇总数值型数据,也可以将任何类型的值聚合到 array、list 或 map 中。
Spark 支持以下分组类型,每个分组都会返回一个 RelationalGroupedDataset
,可以在上面指定聚合函数:
- 最简单的分组是通过在
select
语句中执行聚合来汇总一个完整的 DataFrame; group by
允许指定一个或多个 key 以及一个或多个聚合函数来转换列值;window
可以指定一个或多个 key 以及一个或多个聚合函数来转换列值,但是输入到函数的行以某种方式与当前行有关;grouping set
可用于在多个不同级别进行聚合,grouping set
可以作为 SQL 原语或通过 DataFrame 中的rollup
和cube
使用;group by A, B grouping sets(A, B)
等价于group by A union group by B
;rollup
可以指定一个或多个 key 以及一个或多个聚合函数来转换列值,这些列将按照层次进行聚合;group by A,B,C with rollup
首先会对A,B,C
进行 group by,然后对A,B
进行 group by,最后对A
进行 group by,再对全表进行 group by,最后将结构进行 union,缺少字段补 null;cube
可以指定一个或多个 key 以及一个或多个聚合函数来转换列值,这些列将在所有列的组合中进行聚合;group by A,B,C with cube
,会对A, B, C
的所有可能组合进行 group by,最后再将结果 union;
除了可以在 DataFrame 上或通过 .stat
出现的特殊情况之外,所有聚合都可用作函数,你可以在 org.apache.spark.sql.functions
包中找到大多数聚合函数。
统计聚合
- DataFrame 级聚合:
1 | // count("*") 会显示 count(1),但是直接写 count(1) 却会报错 |
- 分组聚合:分组通常是针对分类数据完成的,我们先将数据按照某些列中的值进行分组,然后对被归入同一组的其他列执行聚合计算;事实上,DataFrame 级聚合只是分组聚合的一种特例;
1 | // 分组语法 |
多维分析
grouping sets
:group by keys grouping sets(combine1(keys), ..., combinen(keys))
,其中,keys
包含了所有可能用于分组的字段,combine(keys)
是 keys 的一个子集,聚合函数会分别基于每组combine(keys)
进行聚合,最后再把所有聚合结果按字段进行 union,不同类型的分组缺失字段补 null;可以通过 null 值在各列上的分布来判断各结果行所属的聚合类型,进一步地,我们可以用grouping_id()
聚合函数值来标识每一结果行的聚合类型,grouping_id()
首先用二进制表示各个 key 是否为 null,如(a, null, null)
对应二进制011
,然后再将该二进制数转化为对应的十进制数(在这个例子中,十进制数为 3)得到grouping_id()
的值;grouping sets
仅在 SQL 中可用,是 group by 子句的扩展,要在 DataFrame 中执行相同的操作,请使用 rollup 和 cube 算子;
1 | val sql = """ |
rollup
:group by A,B,C with rollup
首先会对A,B,C
进行 group by,然后对A,B
进行 group by,最后对A
进行 group by,再对全表进行 group by,最后将结构进行 union,缺少字段补 null;
1 | val sql = """ |
cube
:group by A,B,C with cube
,会对A, B, C
的所有可能组合进行 group by,最后再将结果 union;
1 | val sql = """ |
聚合为复杂类型
可以通过 collect_list
和 collect_set
收集某列中的值,前者保留原始顺序,后者不保证顺序但会去重。
1 | val res = df.select(collect_list("Country"), collect_set("Country")) |
窗口函数
Spark 窗口函数对一组行(如frame、partition)进行操作,并为每个输入行返回一个值。窗口函数是一种特殊的聚合函数,但是输入到函数的行以某种方式与当前行有关,函数会为每一行返回一个值。Spark SQL支持三种窗口函数:
- 排序函数:row_number() rank() dense_rank() percent_rank() ntile()
- 分析函数: cume_dist() lag() lead()
- 聚合函数: sum() first() last() max() min() mean() stddev()
语法:
1 | // 定义窗口 |
示例数据:
1 | import spark.implicits._ |
排序窗口函数
用于排序的窗口定义:
1 | // 按照指定字段分组,在分组内按照另一字段排序,得到排序窗口,如果需要降序,可以使用col("salary").desc |
- row_number: 返回每行排序字段在窗口内的行号;
1 | df.withColumn("row_number",row_number.over(windowSpec)) |
- rank: 返回每行排序字段在窗口内的排名,rank=n+1,n 代表窗口内比当前行小的行数;
1 | df.withColumn("rank",rank().over(windowSpec)) |
- dense_rank: 返回每行排序字段在窗口内的稠密排名,rank=n+1,n 代表窗口内比当前行小的不同取值数;
1 | df.withColumn("dense_rank",dense_rank().over(windowSpec)) |
- percent_rank: 返回每行排序字段在窗口内的百分位排名;
1 | //percent_rank |
- ntile: 返回窗口分区中结果行的相对排名,在下面的示例中,我们使用2作为ntile的参数,因此它返回介于2个值(1和2)之间的排名;
1 | df.withColumn("ntile",ntile(2).over(windowSpec)) |
分析窗口函数
- cume_dist: 窗口函数用于获取窗口分区内值的累积分布,和 SQL 中的 DENSE_RANK 作用相同
1 | df.withColumn("cume_dist",cume_dist().over(windowSpec)).show() |
- lag: 和 SQL 中的 LAG 函数相同,返回值为当前行之前的 offset 行,如果当前行之前的行少于 offset,则返回“ null”。
1 | df.withColumn("lag",lag("salary",2).over(windowSpec)).show() |
- lead: 和 SQL 中的 LEAD 函数相同,返回值为当前行之后的 offset 行,如果当前行之后的行少于 offset,则返回“ null”。
1 | df.withColumn("lead",lead("salary",2).over(windowSpec)).show() |
聚合窗口函数
在本部分中,我将解释如何使用 Spark SQL Aggregate 窗口函数和 WindowSpec 计算每个分组的总和,最小值,最大值,使用聚合函数时,order by 子句特别重要,影响着最后聚合的具体范围。
1 | val windowSpec = Window.partitionBy("department").orderBy("salary") |
自定义函数
自定义函数是 Spark SQL 最有用的特性之一,它扩展了 Spark 的内置函数,允许用户实现更加复杂的计算逻辑。但是,自定义函数是 Spark 的黑匣子,无法利用 Spark SQL 的优化器,自定义函数将失去 Spark 在 Dataframe / Dataset 上所做的所有优化,通常性能和安全性较差。如果可能,应尽量选用 Spark SQL 内置函数,因为这些函数提供了优化。
根据自定义函数是作用于单行还是多行,可以将其划分为两类:
- UDF:User Defined Function,即用户自定义函数,接收一行输入并返回一个输出;
- UDAF:User Defined Aggregate Function,即用户自定义的聚合函数,接收多行输入并返回一个输出;
UDF
使用 UDF 的一般步骤:
- 定义普通函数:与定义一般函数的方式完全相同,但是需要额外注意
- UDF 中参数和返回值类型并不是我们可以随意定义的,因为涉及到数据的序列化和反序列化,详情参考“传递复杂数据类型”一节;
null
值的处理,如果设计不当,UDF 很容易出错,最好的做法是在函数内部检查null
,而不是在外部检查null
;
- 注册 UDF:在 DataFrame API 和 SQL 表达式中使用的 UDF 注册方式有所差异
- 如果要在 DataFrame API 中使用:
val 函数名 = org.apache.spark.sql.functions.udf(函数值)
; - 如果要在 SQL 表达式中使用:
sparkSession.udf.register(函数名, 函数值)
;
- 如果要在 DataFrame API 中使用:
- 应用 UDF:与应用 Spark 内置函数的方法完全相同,只不过原始函数中的变长参数会被注册为
ArrayType
类型,实际传参时也要传入ArrayType
类型的实参;
传递简单数据类型
1 | // 示例数据 |
- 创建一个普通函数:
1 | // convertCase 是一个函数值,将句子中每个单词首字母改为大写 |
- 在 DataFrame 中使用 UDF:
1 | import org.apache.spark.sql.functions.udf |
- 在 SQL 中使用 UDF:
1 | // 1. 注册 UDF |
传递复杂数据类型
在 “Spark SQL 数据类型”一文曾介绍过 Spark 类型和 Scala 类型之间的对应关系,当 UDF 在 Spark 和 Scala 之间传递参数和返回值时也遵循同样的对应关系,下面列出了 Spark 中复杂类型与 Scala 本地类型之间的对应关系:
Spark 类型 | udf 参数类型 | udf 返回值类型 |
---|---|---|
StructType | Row | Tuple/case class |
ArrayType | Seq | Seq/Array/List |
MapType | Map | Map |
本部分将使用如下示例数据来演示以上各种场景:
1 | val data = Seq( |
StructType
如果传给 udf 的是 StructType
类型,udf 参数类型应该定义为 Row
类型;如果需要 udf 返回 StructType
类型,udf 返回值类型应该定义为 Tuple
或 case class
;
- udf 返回值类型可以是
Tuple
:Tuple
返回值会被转化为struct
,Tuple
的各个元素分别对应struct
的各个子域_1
、_2
……
1 | // 数据类型转化过程:Struct => Row => Tuple => Struct |
- udf 的返回值可以是样例类:样例类型返回值会以一种更加自然的方式转化为
struct
,样例类的不同属性构成了struct
的各个子域;
1 | case class P(x:String, y:Int) |
ArrayType
- 返回值类型也可以是 Seq、Array 或 List,不会影响到 udf 签名
1 | def myF(gender:String, a:Seq[Int]):Seq[String] = a.map(x => gender * x.toInt) |
- 参数不能是 Array 或 List,否则会报无法进行类型转换的错误
1 | scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.List` |
- 变长参数会被注册为 ArrayType 类型:使用变长参数和使用 Seq 参数效果是一样的
1 | def myF(gender:String, a:String *):Seq[String] = { |
MapType
1 | def myF(gender:String, m:Map[String, String]):Map[String, String] = { |
UDAF
UDAF(User Defined Aggregate Function,即用户自定义的聚合函数)相比 UDF 要复杂很多,UDF 接收一行输入并产生一个输出,UDAF 则是接收一组(一般是多行)输入并产生一个输出,Spark 维护了一个 AggregationBuffer
来存储每组输入数据的中间结果。使用 UDAF 的一般步骤:
- 自定义类继承
UserDefinedAggregateFunction
,对每个阶段方法做实现; - 在 spark 中注册 UDAF,为其绑定一个名字;
- 然后就可以在sql语句中使用上面绑定的名字调用;
定义 UDAF
我们通过一个计算平均值的 UDAF 实际例子来了解定义 UDAF 的过程:
1 | import org.apache.spark.sql.Row |
注册-使用 UDAF
1 | import org.apache.spark.sql.SparkSession |
参考
- 《Spark 权威指南 Chapter 7.Aggregations》