从定义上讲,DataFrame 由一系列行和列组成,行的类型为 Row,列可以表示成每个单独行的计算表达式。Schema 定义了每个列的名称和类型,Partition 定义了整个集群中 DataFrame 的物理分布。
Schema
Schema 定义了 DataFrame 的列名和类型,我们可以让数据源定义 Schema(schema-on-read),也可以自己明确地进行定义。对于临时分析,schema-on-read 通常效果很好,但是这也可能导致精度问题,例如在读取文件时将 Long 型错误地设置为整型,在生产环境中手动定义 Schema 通常是更好的选择,尤其是在使用 CSV 和 JSON 等无类型数据源时。
Schema 是一种 structType,由很多 StructFields 组成,每个 StructField 具有名称、类型和布尔值标识(用于指示该列是否可以为 null),最后用户可以选择指定与该列关联的元数据,元数据是一种存储有关此列的信息的方式(Spark 在其机器学习库中使用此信息)。如果数据中的类型与 Schema 不匹配,Spark 将引发错误。
1 | import org.apache.spark.sql.types._ |
Columns and Expressions
列只是表达式(Columns are just Expressions
):列以及列上的转换与经过解析的表达式拥有相同的逻辑计划。这是极为重要的一点,这意味着你可以将表达式编写为 DataFrame 代码或 SQL 表达式,并获得完全相同的性能特性。
Columns
对 Spark 而言,Columns 是一种逻辑构造,仅表示通过表达式在每条记录上所计算出来的值。这意味着要有一个列的实际值,我们就需要有一个行,要有一个行,我们就需要有一个 DataFrame,你不能在 DataFrame 上下文之外操作单个列,你必须在 DataFrame 中使用 Spark 转换来修改列的内容。
在 DataFrame 中引用列的方式有很多,以下几种语法形式是等价的:
1 | df.columns |
Expressions
Expressions 是对 DataFrame 记录中一个或多个值的一组转换,可以将其视为一个函数,该函数将一个或多个列名作为输入,进行解析,然后可能应用更多表达式为数据集中每个记录创建单个值(可以是诸如 Map 或 Array 之类的复杂类型)。在最简单的情况下,通过 expr()
函数创建的表达式仅仅是 DataFrame 列引用,expr("col_name")
等价于 col("col_name")
。
列提供了表达式功能的子集,如果使用 col()
并想在该列上执行转换,则必须在该列引用上执行那些转换,使用表达式时, expr 函数实际上可以解析字符串中的转换和列引用,例如:expr("col_name - 5")
等价于 col("col_name") - 5
,甚至等价于 expr("col_name") - 5
。
1 | import org.apache.spark.sql.functions.expr |
Records and Rows
DataFrame 中的每一行都是一条记录,Spark 将此记录表示为 Row 类型的对象,Spark 使用列表达式操纵 Row 对象,以产生可用的值。Row 对象在内部表示为字节数组,但是字节数组接口从未显示给用户,因为我们仅使用列表达式来操作它们。
可以通过手动实例化具有每个列中的值的 Row 对象来创建行,但是务必注意只有 DataFrame 有 Schema,Row 本身没有模式。
1 | import org.apache.spark.sql.Row |
访问行中的数据很容易,只需要指定位置或列名:
1 | df.collect().foreach(row=>{ |
DataFrame 转换
DataFrame 转换不会改变原有的 DataFrame,而是生成一个新的 DataFrame。很多 DataFrame 转换/函数被包含在 org.apache.spark.sql.functions
模块,使用前推荐先导入相关模块:
1 | import org.apache.spark.sql.functions._ |
本文主要用到的示例数据:
1 | val data = Seq( |
1 | +--------------------+-----+------+------+ |
列操作
select —— 筛选列
功能:
select()
用于筛选/操作列;语法:有两种语法形式,但是两种形式不能混用;
1 | // 传入列名字符串 |
- 示例:
1 | // 可以是列名字符串、*代表所有列、a.b 代表 struct 中的子域、不可用 as |
selectExpr —— 通过 SQL 语句筛选列
- 功能:selectExpr 和 select 作用相同,只是 selectExpr 更加简洁、灵活、强大;
- 语法:可以通过构造任意有效的非聚合 SQL 语句来生成列(如果使用了聚合函数,则只能应用于整个 DataFrame);这释放了 Spark 的真正力量,我们可以将 selectExpr 视为构建复杂表达式以生成新的 DataFrame 的简单方法;如果列名中包含了保留字或关键字,例如空格或破折号,可以通过反引号(`)字符引用列名;
1 | selectExpr(exprs : scala.Predef.String*) : org.apache.spark.sql.DataFrame |
- 示例:
1 | df.selectExpr("name.firstname", "dob as f_dob", "*", "dob + salary as new_col").show() |
selectExpr 的灵活用法使其可以替代大部分的列操作算子,但是考虑到代码的简洁性,对于一些具体的操作,往往会有更简单直接的算子。事实上,DataFrame 操作使用最多的算子是 withColumn
,withColumn
算子将单列处理逻辑封装到独立的子句中,更具可读性,也方便了代码维护。
withColumn —— 添加或更新列
- 功能:
withColumn()
可以用来添加新列、改变已有列的值、改变列的类型; - 语法:withColumn 有两个参数,列名和将为 DataFrame 各行创建值的表达式;
1 | withColumn(colName: String, col: Column): DataFrame |
- 示例:
1 | // 添加新的列 |
withColumnRenamed —— 重命名列
- 功能:withColumnRenamed 用于重命名列;
- 语法:
1 | withColumnRenamed(existingName: String, newName: String): DataFrame |
- 示例:有多种方式可以用于重命名单个列、多个列、所有列、嵌套列
1 | // 重命名单个列,withColumnRenamed(x, y) 将 y 列重名为 x |
drop —— 删除列
- 功能:drop() 用于删除 DataFrame 中单个或多个列,如果指定列不存在则忽略,在两个表进行 join 时通常可以利用这一点来保证两个表除了关联键之外不存在同名字段。
- 语法:
1 | // drop 有三种不同的形式: |
- 示例:
1 | val df = spark.range(3) |
行操作
where | filter —— 筛选行
- 功能:where 和 filter 是完全等价的,用于按照指定条件筛选 DataFrame 中满足条件的行;
- 语法:传入一个布尔表达式,过滤掉 false 所对应的行;
1 | // 有四种形式 |
- 示例:
1 | df.show() |
distinct —— 行去重
- 功能:
distinct()
方法可以移除 DataFrame 中重复的行,dropDuplicates()
方法用于移除 DataFrame 中在某几个字段上重复的行(默认保留重复行中的第一行)。 - 语法:
1 | distinct(): Dataset[T] = dropDuplicates() |
- 示例:
1 | df.show() |
groupBy —— 行分组
- 功能:和 SQL 中的 group by 语句类似,
groupBy()
函数用于将DataFrame/Dataset
按照指定字段分组,返回一个RelationalGroupedDataset
对象 语法:
RelationalGroupedDataset
对象包含以下几种聚合方法:- count()/max()/min()/mean()/avg()/sum(): 返回每个分组的行数/最大/最小/平均值/和;
- agg(): 可以同时计算多个聚合;
- pivot(): 用于行转列;
示例:
1 | import spark.implicits._ |
sort —— 行排序
- 功能:在 Spark 中,可以使用 sort() 或 orderBy() 方法来根据某几个字段的值对 DataFrame/Dataset 进行排序。
- 语法:
1 | // sort |
- 示例:
1 | df.sort("department","state").show(false) |
map —— 映射
- 功能:map() 和 mapPartitions() 转换将函数应用于 DataFrame/Dataset 的每个元素/记录/行,并返回新的 DataFrame/Dataset,需要注意的是这两个转换都返回 Dataset[U] 而不是 DataFrame(在Spark 2.0中,DataFrame = Dataset [Row])。
- 语法: Spark 提供了 2 个映射转换签名,一个以 scala.function1 作为参数,另一个以 Spark MapFunction 作为签名,注意到这两个函数都返回 Dataset [U],但不返回DataFrame,即Dataset [Row]。如果希望将 DataFrame 作为输出,则需要使用 toDF() 函数将 Dataset 转换为 DataFrame。
1 | 1) map[U](func : scala.Function1[T, U])(implicit evidence$6 : org.apache.spark.sql.Encoder[U]) |
- 示例:
1 | // 示例数据 |
foreach —— 遍历
功能:foreach() 方法用于在 RDD/DataFrame/Dataset 的每个元素上应用函数,主要用于操作累积器共享变量,也可以用于将 RDD/DataFrame 结果写入数据库,生产消息到 kafka topic 等。foreachPartition() 方法用于在 RDD/DataFrame/Dataset 的每个分区上应用函数,主要用于在每个分区进行复杂的初始化操作(比如连接数据库),也可以用于操作累加器变量。foreach() 和 foreachPartition() 方法都是不会返回值的 action。
语法:
1 | foreachPartition(f : scala.Function1[scala.Iterator[T], scala.Unit]) : scala.Unit |
- 示例:
1 | // foreach 操作累加器 |
sample —— 随机抽样
- 功能:从 DataFrame 中抽取一些随机记录;
- 语法:
1 | // withReplacement: 是否是有放回抽样; fraction: 抽样比例; seed: 抽样算法初始值 |
- 示例:
1 | df.sample(0.2).show() |
split —— 随机分割
- 功能:将原始 DataFrame 随机拆分,这通常与机器学习算法一起使用以创建训练、验证和测试集;
- 语法:返回 Array(DataFrame);
1 | randomSplit(weights: Array[Double]) |
- 示例:
1 | val dfs = df.randomSplit(Array(0.8, 0.2)) |
limit —— 限制
- 功能:限制从 DataFrame 中提取的内容,当你需要一个空的 DataFrame 但又想保留 Schema 信息时可以通过
df.limit(0)
来实现; - 语法:
1 | df.limit(n) |
- 示例:
1 | df.orderBy("dob").limit(3).show() |
first | last —— 首行或末行
功能:获取某列第一行/最后一行的值
语法:
1 | first(e: Column, ignoreNulls: Boolean) |
- 示例:
1 | df.select(first("name"), first("dob"), last("gender"), last("salary")).show() |
表操作
union —— 合并
- 功能:在 Spark 中 union() 和 unionAll() 作用相同,用于合并两个 schema 相同(不会校验schema,只会校验字段数是否相同)的 DataFrame,但是都不会对结果进行去重,如果需要去重,可以通过去重算子对结果去重。
- 语法:
1 | df.union(df2) |
- 示例:
1 | // 没有什么好展示的 |
join —— 连接
- 功能:Spark SQL 支持传统 SQL 中可用的所有基本联接操作(这里不再赘述),尽管 Spark 核心联接在设计时不小心会产生巨大的性能问题,因为它涉及到跨网络的数据 shuffe,另一方面,Spark SQL 连接在默认情况下具有更多的优化(多亏了 DataFrames & Dataset),但是在使用时仍然会有一些性能问题需要考虑;
- 语法: 三要素为连接表、连接谓词、连接类型;
1 | 1) join(right: Dataset[_]): DataFrame |
- join 类型: 对于上面语句 4 和语句 5,你可以使用 JoinType 或 Join String 中的一种,如果要使用 JoinType,应该先导入
import org.apache.spark.sql.catalyst.plans._
,以下示例将采用上面语句 6 的形式
JoinType | Join String | Equivalent SQL Join |
---|---|---|
Inner.sql | inner | INNER JOIN |
FullOuter.sql | outer, full, fullouter, full_outer | FULL OUTER JOIN |
LeftOuter.sql | left, leftouter, left_outer | LEFT JOIN |
RightOuter.sql | right, rightouter, right_outer | RIGHT JOIN |
Cross.sql | cross | - |
LeftAnti.sql | anti, leftanti, left_anti | - |
LeftSemi.sql | semi, leftsemi, left_semi | - |
- 示例数据:
1 | val emp = Seq((1,"Smith",-1,"2018","10","M",3000), |
Inner Join
Inner Join 内连接,只返回匹配成功的行。
1 | empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner").show(false) |
Full Join
Outer/Full,/Fullouter Join 全外连接,匹配成功的 + 左表有右表没有 + 右表有左表没有
1 | empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer").show(false) |
Left Join
Left/Leftouter Join 左连接,匹配成功的 + 左表有右表没有的
1 | empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left").show(false) |
Right Join
Right/Rightouter Join 右连接,匹配成功的 + 右表有左表没有的
1 | empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right").show(false) |
Left Semi Join
Left Semi Join 左半连接,匹配成功的,只保留左表字段。
1 | empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi").show(false) |
Left Anti Join
Left Anti Join 反左半连接,没有匹配成功的,只返回左表字段
1 | empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti").show(false) |
Self Join
虽然没有自连接类型,但是可以使用以上任意一种 join 类型与自己关联,但是要通过别名的方式。为DataFrame 起别名 "a"
后,原有字段名 "col"
就变成 "a.col"
,可以通过 "a.*"
把原有的列“释放”出来。
1 | empDF.as("emp1").join(empDF.as("emp2"), col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner") |
Cross Join
Cross Join(笛卡尔连接、交叉连接)会将左侧 DataFrame 中的每一行与右侧 DataFrame 中的每一行进行连接,这将导致结果 DataFrame 中的行数发生绝对爆炸,仅在绝对必要时才应使用笛卡尔积,它们很危险!!!我们分几种场景来讨论和 Cross Join 相关的一些问题:
join
算子中如果指定了连接谓词,那么,即使将参数joinType
设置为 “cross”,实际执行的仍然是inner join
1 | empDF.join(deptDF, empDF("emp_dept_id") === deptDF("dept_id"), "cross").show() |
join
算子中,如果将连接谓词设置为恒等式,可以实现笛卡尔积(joinType
需同时设置为 “cross”)
1 | empDF.join(deptDF, lit(1) === lit(1), "cross").show() |
join
算子中,如果省略了连接谓词,则会报AnalysisException
错误,一种解决办法是设置spark.conf.set("spark.sql.crossJoin.enabled",true)
,以允许笛卡尔积而不会发出警告或 Spark 不会尝试为您执行另一种连接
1 | empDF.join(deptDF).show() |
- 以上方式虽然可以实现 cross Join,但并不推荐使用,从
spark-sql_2.11
2.1.0 之后的版本专门提供了crossJoin
算子来实现笛卡尔积,使用crossJoin
不用修改配置
1 | empDF.crossJoin(deptDF).show() |
同源 DataFrame JOIN 陷阱
当同源 DataFrame(衍生于同一个 DataFrame )之间进行 Join 时,可能会导致一些意想不到的错误。
1 | var x = empDF.groupBy("superior_emp_id").agg(count("*").as("f_cnt")) |
有多种方式可以解决这个问题:
- 使用 SQL 表达式
1 | empDF.createOrReplaceTempView("empDF") |
- 为 DataFrame 起别名
1 | empDF.as("a").join(x.as("b"), col("a.emp_id") === col("b.superior_emp_id")).show() |
withColumn
重命名列
1 | val x = empDF.groupBy("superior_emp_id").agg(count("*").as("f_cnt")) |
toDF
重新定义其中一个 DataFrame 的 Schema:
1 | x = x.toDF(x.columns:_*) |
usingColumn 陷阱
usingColumn
语法得到的结果 DataFrame 中会自动去除被 join DataFrame 的关联键,只保留主调 DataFrame 中的关联键,所以不能通过 select
或 expr
选择被调 DataFrame 中的关联键,但是却可以在 filter
中引用被调 DataFrame 中的关联键:
1 | val x = deptDF.limit(2).select("dept_id").toDF("dept_id") |
处理 join 中的同名字段
如果参与 join 的两个 DataFrame 之间存在相同名称的字段,很容易在后续的转换操作中出现 Reference is ambiguous
的错误,整体上有两种解决思路:
- 如果需要的字段少:那就 select 你所需要的字段就行了;
- 如果需要的字段多:那就 drop 不需要的字段;
在 join 前中后又可以有不同的处理方式:
- join 前:修改/删除其中一方 DataFrame 的同名字段名;
- join 中:如果同名字段是 join 的关联键,使用
usingColumn
语法,join 后只会保留左表关联字段; - join 后:
- 要么通过
select(Expr)
明确指定需要的表字段; - 要么通过
drop
删除不需要的表字段; - 要么通过
withColumn
添加新的字段,此时withColumn
如果用于修改已有同名字段的内容,将会同时修改所有同名字段,修改后的结果仍会保留同名字段;
- 要么通过
示例:
1 | // 示例数据 |
join 最佳实践
DataFrame API 的 JOIN 操作有诸多需要注意的地方,除了正确使用 JOIN 类型和 JOIN 语法外,经常引起困惑的地方在于如何从 JOIN 结果中选择我们需要的字段,对此,我们总结了一些最佳实践:
- 当 DataFrame 不方便通过一个变量来引用时,可以在 JOIN 语句中为 DataFrame 起别名:
- 可以通过
"表别名.字段名"
来引用对应字段; - 如果不存在同名字段,也可以省略掉表别名,直接用
"字段名"
来应用对应字段;
- 可以通过
- 当 JOIN 的两个 DataFrame 中包含同名字段时:
- 可以在 JOIN 前删除/重命名无用的同名字段;
- 如果同名字段作为关联字段,
usingColumn
语法将只会保留左表关联字段; - 可以在 JOIN 后
select(Expr)
需要的字段,drop
不需要的字段,withColumn
添加新的字段;
- 同源 DataFrame 之间 JOIN,在 JOIN 前通过
toDF()
转化其中一个 DataFrame;
看过上面的示例,你可能会觉得 DataFrame 的 JOIN 太不方便了,还不如直接写 SQL 表达式呢!事实上,DataFrame API 更加紧凑,更便于编写结构化代码,能够帮助我们完成大部分的语法检查,如果要在 DataFrame 中穿插 SQL 表达式,就使用 expr() 或 selectExpr() 函数吧!
repartition —— 重分区
- 功能:repartition 会导致数据的完全随机洗牌(shuffle),这意味着通常仅应在将来的分区数大于当前的分区数时或在按一组列进行分区时重新分区;如果经常要按照某个列进行过滤,则值得按该列重新分区;
- 语法:
1 | // 指定所需的分区数 |
- 示例:
1 | df.repartition(3) |
coalesce —— 分区合并
- 功能:coalesce 不会引起 full shuffle,并尝试合并分区(将来的分区数小于当前的分区数);
- 语法:
1 | coalesce(numPartitions: Int) |
- 示例:
1 | df.repartition(5, col("dob")).coalesce(2) |
cache | persist —— 缓存
功能:虽然 Spark 提供的计算速度是传统 Map Reduce 作业的 100 倍,但是如果您没有将作业设计为重用重复计算,那么当您处理数十亿或数万亿数据时,性能会下降。使用 cache() 和 persist() 方法,每个节点将其分区的数据存储在内存/磁盘中,并在该数据集的其他操作中重用它们,真正缓存是在第一次被相关 action 调用后才缓存。Spark 在节点上的持久数据是容错的,这意味着如果数据集的任何分区丢失,它将使用创建它的原始转换自动重新计算。Spark 会自动监视您进行的每个 persist()和cache()调用,并检查每个节点上的使用情况,如果不再使用或通过 least-recently-used (LRU) 算法,删除持久化数据,也可以使用 unpersist()方法手动删除。unpersist()将数据集标记为非持久性,并立即从内存和磁盘中删除它的所有块。
语法:
1 | // StorageLevel |
- 示例:
1 | // cache |
- StorageLevel 有以下几个级别:
级别 | 使用空间 | CPU时间 | 是否内存 | 是否磁盘 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | - |
MEMORY_ONLY_2 | 高 | 低 | 是 | 否 | 数据存2份 |
MEMORY_ONLY_SER_2 | 低 | 高 | 是 | 否 | 数据序列化,数据存2份 |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 内存放不下,则溢写到磁盘 |
MEMORY_AND_DISK_2 | 高 | 中等 | 部分 | 部分 | 数据存2份 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | - |
MEMORY_AND_DISK_SER_2 | 低 | 高 | 部分 | 部分 | 数据存2份 |
DISK_ONLY | 低 | 高 | 否 | 是 | |
DISK_ONLY_2 | 低 | 高 | 否 | 是 | 数据存2份 |
NONE | - | - | - | - | - |
OFF_HEAP | - | - | - | - | - |
collect —— 收集到 driver
功能:collect() 和 collectAsList() 用于将 RDD/DataFrame/Dataset 中所有的数据拉取到 Driver 节点,然后你可以在 driver 节点使用 scala 进行进一步处理,通常用于较小的数据集,如果数据集过大可能会导致内存不足,很容易使 driver 节点崩溃并时区应用程序的状态,这也很昂贵,因为是逐条处理,而不是并行计算。
语法:
1 | collect() : scala.Array[T] |
- 示例:
1 | df.show() |
其他操作
when —— 条件判断
- 功能:
when otherwise
类似于 SQL 中的 case when 语句; - 语法:可以由多个 when 表达式(不满足前一个 when 条件则继续匹配下一个 when 条件),也可以不带 otherwise 表达式(不满足 when 条件则返回 null);
1 | when(condition: Column, value: Any): Column |
- 示例:
1 | df.withColumn("new_gender", when(col("gender") === "M", "Male")).show() |
flatten —— 列拆多列
功能:在 Spark SQL 中,扁平化 DataFrame 的嵌套结构列对于一级嵌套很简单,而对于多级嵌套和存在数百个列的情况下则很复杂。
扁平化嵌套 struct: 如果哦列数有限,可以通过引用列名似乎很容易解决,但是请想象一下,如果您有100多个列并在一个select中引用所有列,那么会很麻烦。可以通过创建一个递归函数 flattenStructSchema()轻松地将数百个嵌套级别列展平。
1 | val structureData = Seq( |
- 扁平化嵌套 Array: 上个示例展示了如何打平嵌套 Row,对于嵌套 Array 则可以通过 flatten() 方法除去嵌套数组第一层嵌套。
1 | val arrayArrayData = Seq( |
explode —— 行拆多行
- 功能:在处理 JSON,Parquet,Avro 和 XML 等结构化文件时,我们通常需要从数组、列表和字典等集合中获取数据。在这种情况下,explode 函数(explode,explorer_outer,posexplode,posexplode_outer)对于将集合列转换为行以便有效地在 Spark 中进行处理很有用。
语法:
示例:
1 | // 示例数据 |
pivot | stack —— 行转列 | 列转行
功能:
- pivot() 是一种聚合方法(类似于 Excel 中的数据透视表),用于将 DataFrame/Dataset 的行转列,该过程可以被分为三个步骤,① 按 x 列分组,x 的不同取值作为行向标签 ② 将 y 列的不同取值作为列向标签 ③ 将行列标签 (x,y) 对应 z 的聚合结果作为值,如果源表没有 (x,y) 对应的数据则补 null;
- stack() 方法可以将 DataFrame/Dataset 的列转行,注意 Spark 没有 unpivot 方法;
语法:
1 | groupBy(x).pivot(y).sum(z) // x 列不同值作为行标签,y 列不同值作为列标签,z 列的聚合作为值 |
- 示例:
1 | // 创建一个 DataFrame |
1 | // stack(n, 列1显示名, 列1, ..., 列n显示名, 列n) |
参考
- 《Spark 权威指南》_online/)
- Spark 2.2.x 中文文档
- Spark By Examples
- org.apache.spark.sql.Dataset:Dataset 对象方法
- org.apache.spark.sql.Dataset.Column:Column 对象方法