Hive 作为爱奇艺数仓的基础,Hive SQL 是爱奇艺大数据平台目前主要的数处理工具,各个业务积累大量的 Hive ETL 任务。Spark 相对于 MapReduce 有着更为灵活的的计算模型,这使得 Spark 相对于 Hive (on MapReduce) 有更好的性能。
经过测试对比,我们发现迁移 Hive SQL 到 Spark 将会带来很大的性能提升和资源节省。
Apache Kyuubi (Incubating) 项目提供一个分布式多租户的 Spark Thrift Server,相对于 Spark 原生的 Spark Thrift Server 有更好的架构优势和更多优秀的特性,具体对比可参考:Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS)。
Hive SQL 迁移 Spark
1.1 双跑对比
大数据平台中已有大量稳定运行的 Hive SQL 任务,为了在迁移的过程中提高用户迁移意愿,降低与用户的沟通成本,我们需要保证迁移后 Spark SQL 的稳定性以及数据准确性,并尽量减少用户操作。
我们在大数据平台中新增了 Hive SQL 迁移服务,提供一键批量双跑测试和一键迁移功能。在迁移前, 我们会对业务的 Hive SQL 任务进行批量双跑测试。改写业务 SQL 将输出替换成临时库表,并分别通过 Hive SQL 和 Spark SQL 执行。
执行完成后,对 Hive SQL 和 Spark SQL 的执行结果进行校验,确保迁移后数据一致。对每行数据进行 concat 后计算 crc32 并求和得到 checksum 值,通过对比两个结果表的 checksum 值和 count 数,来确定数据是否一致。
select sum(cast(CRC32(concat(*)) as decimal(19,0))) as checksum, count(*) as count from mock_db.mock_table
通过双跑测试,我们能够提前发现 Hive SQL 与 Spark SQL 的一些兼容性问题,以及配置不合理导致任 务失败的问题。及时优化平台配置,指导用户优化 SQL 并为任务提供合适的配置。
1.2 兼容性适配
Hive SQL 迁移到 Spark 的过程中,我们遇到了一些语法兼容性的问题,例如:
- Spark 删除不存在分区报错,而 Hive 中允许删除不存在分区
- Spark 执行
set mapreduce.job.reduces=-1
报错 - 数字类型与字符串比较,结果与 Hive 不一致
- …..
为了加快迁移的进度,减少用户参与,我们需要兼容部分 Hive 语法。
在 Kyuubi 中提供 kyuubi-extension-spark
模块,维护 Spark SQL的 Extensions,用于优化Spark SQL的执行计划,社区已经提供了很多优化,详细文档:Auxiliary SQL extension for Spark SQL 。我们基于此模块,为不兼容的语法修改其执行计划,使得与 Hive 保持一致的语义。
例如,在 Hive 中由于默认的 hive.exec.drop.ignorenonexistent=true
配置,在删除不存在的表或分区时不会报错,但是迁移到 Spark 运行后,出现大量删除不存在分区的错误。我们在Kyuubi中实现了 DropIgnoreNonexistent 优化,改写 AlterTableDropPartitionCommand/DropTableCommand
等命令的执行计划,将 ifExists 属性设置为 true ,从而在删除不存在分区时不报错。
Kyuubi提供了只解释SQL执行计划的运行模式,通过 kyuubi.operation.plan.only.mode
配置,可以以 PARSE/ANALYZE/OPTIMIZE/PHYSICAL/EXECUTION
的一些模式解释 SQL 的执行计划而不执行它,这样我们可以快速扫描出有语法兼容问题的一些 SQL。
1.3 小文件优化
Hive SQL 迁移 Spark 运行不可避免地会出现小文件的问题。我们使用 Kyuubi 的 Spark Extensions 结合 Spark AQE 可以有效地控制小文件问题。
Kyuubi Spark Extensions 中提供了 RepartitionBeforeWrite
的优化,在写入前插入 repartition 操作,再结合 AQE 合并小分区,从而实现控制小文件。
对于写入动态分区的情况,Kyuubi 中 Repartition 操作指定了动态分区字段,这样可以减小动态分区导致的小文件。如果动态分区数据分布不均匀,可能导致部分分区数据倾斜,所以对于动态分区写入 Kyuubi 在 Repartition 时,同时加入了一个随机数字段来避免数据倾斜。不过这样由于扩展了分区字段 的基数,也可能会导致小文件问题。
在 spark-defaults.conf
中添加相关的配置:
spark.sql.extensions org.apache.kyuubi.sql.xyuubiSparksQ1Extension # Kyuubi # spark.sql.optimizer.insertmepartitionneforewrite.enabled true # AQE spark.sql.adaptive.enabled true spark.sql.adaptive.advisoryPartitionSizexnaytes 1024m spark. sql .adaptive. coal esceParti ti on s .mi nmarti ti onmum 1
对于 Spark3.2,Kyuubi 通过在写入前插入 Rebalance 算子,更好地解决小文件的问题。
Kyuubi 服务增强
2.1 标签化配置
对于不同用户和业务的 SQL,由于数据量和业务处理逻辑存在差异,可能需要对不同的任务进行单独的 配置优化。我们在数据开发平台上提供了用户配置参数的能力,允许用户对 SQL 任务添加配置。
对于同一用户或者业务会存在一些具有相同的特效的一些 SQL,我们提供标签化配置,定义一些标签绑定特有的配置,并在任务提交时带上相关的标签。
例如,我们定义的部分标签如下:
- Adhoc:用于即席查询平台任务,数据量小,要求快速响应。配置 USER 共享级别的引擎,较大 Driver 内存,以及 Spark 任务抢占策略等。
- Batch:用于数据开发平台任务,定时调度运行,稳定要求较高。配置 CONNECTION 级别独立引 擎,使得任务完全资源隔离,并添加小文件、AQE 等优化配置。
- User/Business/Custom:允许定义用户、业务或其他自定义标签,绑定特有的一些配置,如:队列、资源、兼容性适配相关配置等,降低用户使用门槛。
Kyuubi 中已经支持了对用户添加默认配置,例如在配置文件中添加 ___bob___.spark.executor.memory=8g
配置,为 bob 用户的任务默认指定 8G 的 executor 内存,具体使用参考:User Default 。
同时,在Kyuubi 的 kyuubi-server-plugin
中,提供了 SessionConfAdvisor
接口,用于为 Session 注入配置。可以将标签配置保存在数据库中,并实现 SessionConfAdvisor
接口,根据 Session 配置 的标签从数据库中获取对应的配置。
2.2 SQL 审计
Kyuubi 服务中定义了一些事件,可用于 SQL 审计操作。
Kyuubi Server 中定义了如下事件:
- KyuubiServerInfoEvent:Kyuubi Server 启动、停止事件信息
- KyuubiSessionEvent:Session 开启、关闭事件,以及连接相关信息
- KyuubiOperationEvent:Kyuubi Operation 执行事件,包括了 SQL 执行相关信息
目前 Kyuubi 仅实现了 JSON 写入本地文件的方式采集事件,相关配置:
将事件写入 JSON 文件后,可以借助日志收集插件采集到 Kakfa 中并落地到 ElasticSearch 中,方便后续 对 SQL 执行事件进行审计分析。
我们在 Kyuubi Server 中通过实现自定义的 EventHandler
,处理 Kyuubi Server 的事件,直接将事件写入到 ElasticSearch 中。分析失败 SQL 执行事件的错误信息,完善 Spark SQL 运维知识库,并添加正则匹配分析规则,匹配执行事件的错误信息给出对应的解决方案,方便用户自行排障。
文中文档可参考:
-
Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS)——https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_thriftserver.html
-
Auxiliary SQL extension for Spark SQL—— https://kyuubi.apache.org/docs/latest/sql/rules.html#
-
DropIgnoreNonexistent——https://github.com/apache/incubator-kyuubi/blob/master/extensions/spark/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/DropIgnoreNonexistent.scala
-
User Defaults—— https://kyuubi.apache.org/docs/latest/deployment/settings.html#user-defaults