SQL(Structured Query Language) 是一种领域特定语言,用于表达对数据的关系型操作。SQL 无处不在,即使技术专家预言了它的消亡,它还是许多企业所依赖的及其灵活的数据工具。Spark 实现了 ANSI SQL:2003 的一个子集,该标准是大多数 SQL 数据库中可用的标准。Spark SQL 旨在用作联机分析处理(OLAP)数据库,而不是联机事务处理(OLTP)数据库,这意味着它不打算执行极低延迟的查询,即使将来肯定会支持原地修改,但是目前还不支持。
Spark SQL & Hive
Spark SQL 的前身是 Shark。为了给熟悉 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具,hive 应运而生,它是当时唯一运行在 Hadoop 上的 SQL-on-hadoop 工具。但是MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O,降低的运行效率,为了提高 SQL-on-Hadoop 的效率,Shark 应运而生,但又因为 Shark 对于 Hive 的太多依赖(如采用 Hive 的语法解析器、查询优化器等等),2014 年 Spark 团队停止对 Shark 的开发,将所有资源放 Spark SQL 项目上。其中 Spark SQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive;而 Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,也就是说,Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎。
执行 SQL
Spark 提供了几个接口来执行 SQL 查询:
- Spark SQL CLI:你可以使用 Spark SQL CLI 从命令行在本地模式下进行基本的 Spark SQL 查询, Spark SQL CLI 无法与 Thrift JDBC 服务器通信,要启动 Spark SQL CLI,请在 Spark 目录下运行以下命令
1 | ./bin/spark-sql |
- Spark 编程接口:你可以通过任意 Spark 语言 API 以临时方式执行 SQL,你可以通过 SparkSession 对象上的 sql 方法执行此操作,这将返回一个 DataFrame
1 | spark.sql(sql_statement) |
Catalog
Catalog 是 Spark SQL 中最高级别的抽象,用于对数据库、表、视图、缓存、列、函数(UDF/UDAF)的元数据进行操作,其 API 可以在 org.apache.spark.sql.catalog
中查看。
示例数据:
1 | val data = Seq( |
获取 catalog 对象:
1 | val c = spark.catalog |
操作数据库
- API:
1 | // 返回当前使用的数据库,相当于select database() |
- 示例:
1 | c.listDatabases().show(false) |
操作表/视图
- API:
1 | // 表/视图的属性 |
- 示例:
1 | c.listTables("default").show() |
函数相关
- API:
1 | // 函数的属性 |
- 示例:
1 | c.listFunctions.show(10, false) |
操作表/视图的列
- API:
1 | // 列的属性 |
- 示例:
1 | c.listColumns("df").show() |
Tables
要用 Spark SQL 做任何有用的事情,首先要定义表,表在逻辑上等效于 DataFrame,因为他们是运行命令所依据的数据结构,我们可以对表进行关联、过滤、汇总等操作,表和 DataFame 之间的核心区别在于:在编程语言范围内定义 DataFrame,在数据库中定义表。
创建表
Spark 相当独特的功能是可以在 SQL 中重用整个数据源 API:
1 | // 从数据源读取数据,创建表,定义了一个非托管表 |
插入表
1 | val sql = """ |
描述表
1 | spark.sql("describe df_copy").show() |
刷新表
REFRESH TALE 刷新与该表的所有缓存条目(实质上是文件),如果该表先前已被缓存,则下次扫描时将被延迟缓存:
1 | spark.sql("refresh table df_copy") |
删除表
删除表会删除托管表中的数据,因此执行此操作时需要非常小心。
1 | spark.sql("drop table if exists df_copy") |
缓存表
和 DataFrame 一样,你可以缓存表或者取消缓存表:
1 | spark.sql("uncache table flights") |
Views
视图是保存的查询计划,可以方便地组织或重用查询逻辑。
创建视图
Spark 有几种不同的视图概念,视图可以是全局视图、数据库视图或会话视图:
1 | // 常规/数据库视图:在所属数据库可见,不能基于视图再创建常规视图 |
访问视图
定义好视图,就可以像访问表一样在 SQL 中访问视图了:
1 | spark.sql("select * from replace_temp_view_f").show() |
删除视图
1 | spark.sql("drop view if exists replace_temp_view_f") |
Databases
数据库是用于组织表的工具,如果你没有定义数据库,Spark 将使用默认的数据库,在 Spark 中运行的所有 SQL 语句(包括 DataFrame 命令)都是在数据库的上下文中执行的,如果你更改数据库,则任何用户定义的表都将保留在先前的数据库中,并且要以其他方式查询。
1 | // 创建数据库 |
查询语句
Spark 中的查询支持以下 ANSI SQL 要求(此处列出了 SELECT 表达式的布局):
1 | SELECT [ALL|DISTINCT] named_expression[, named_expression, ...] |
SQL 配置
查看当前环境 SQL 参数的配置:
1 | spark.sql("SET -v").show(false) |
配置项
1 | #Job ID /Name |
配置方法
可以在应用程序初始化时或在应用程序执行过程中进行设置:
1 | spark.conf.set("spark.sql.crossJoin.enabled", "true") |
参考
- 《Spark 权威指南:Chapter 10》
- 什么是Catalog
- https://spark.apache.org/docs/2.3.0/api/sql/