Spark分区管理
当我们使用Spark加载数据源并进行一些列转换时,Spark会将数据拆分为多个分区Partition,并在分区上并行执行计算。所以理解Spark是如何对数据进行分区的以及何时需要手动调整Spark的分区,可以帮助我们提升Spark程序的运行效率。
1. 什么是分区
关于什么是分区,其实没有什么神秘的。我们可以通过创建一个DataFrame来说明如何对数据进行分区:
1 | scala> val x = (1 to 10).toList |
创建好DataFrame之后,我们再来看一下该DataFame的分区,可以看出分区数为4:
1 | scala> numsDF.rdd.partitions.size |
当我们将DataFrame写入磁盘文件时,再来观察一下文件的个数,
1 | scala> numsDF.write.csv("file:///opt/modules/data/numsDF") |
可以发现,上述的写入操作会生成4个文件
2. coalesce操作
源码
1 | def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { |
解释
在减少分区时,返回一个新的分区数为指定numPartitions
的DataSet,在增大分区时,则分区数保持不变。值得注意的是,该操作生成的是窄依赖,所以不会发生shuffle。然而,如果是极端的操作,比如numPartitions = 1,这样会导致只在一个节点进行计算。为了避免这种情况发生,可以使用repartition方法,该方法会发生shuffle操作,这就意味着当前的上游分区可以并行执行
示例
减少分区操作
coalesce方法可以用来减少DataFrame的分区数。以下操作是将数据合并到两个分区:
1 | scala> val numsDF2 = numsDF.coalesce(2) |
我们可以验证上述操作是否创建了只有两个分区的新DataFrame:可以看出,分区数变为了2
1 | scala> numsDF2.rdd.partitions.size |
将numsDF2写入文件存储,观察文件数量
1 | numsDF2.write.csv("file:///opt/modules/data/numsDF2") |
可以发现,上述的写入操作会生成2个文件
上述每个分区的数据如下:
1 | part-00000: 1, 2, 3, 4, 5 |
对比减少分区之前的数据存储,可以看出:在减少分区时,并没有对所有数据进行了移动,仅仅是在原来分区的基础之上进行了合并而已,这样的操作可以减少数据的移动,所以效率较高。
增加分区操作
从上面的源码可以看出,如果使用coalesce方法进行增加分区,将不会生效。我们可以尝试通过coalesce来增加分区的数量,观察一下具体结果:
1 | scala> val numsDF3 = numsDF.coalesce(6) |
可以看出,即使我们尝试使用coalesce(6)来创建6个分区,numsDF3的分区数依然是4,并没有发生变化,coalesce算法通过将数据从某些分区移动到现有分区来更改分区数,该方法显然不能增加分区数。
3. repartition操作
源码
1 | /** |
从源码中可以看出,该方法可以用于减少或者增加分区的数量,并且会发生Shuffle操作。
示例
减少分区操作
已知numsDF有4个分区,现在将其分区置为2,观察结果
1 | scala> val numsDF4 = numsDF.repartition(2) |
可以看出,分区确实减少了,我们在来看一下每个分区的数据:
1 | numsDF4.write.csv("file:///opt/modules/data/numsDF4") |
上面的操作会产生两个文件,每个分区文件的数据为:
1 | part-00000:2,3,4,7,10,9 |
从上面的数据分布可以看出,数据被Shuffle了。这也印证了源码中说的,repartition操作会将所有数据进行Shuffle,并且将数据均匀地分布在不同的分区上,并不是像coalesce方法一样,会尽量减少数据的移动。
增加分区操作
repartition操作方法不仅可以用于减少分区操作,也可以用于增加分区数量。
1 | scala> val numsDF5 = numsDF.repartition(6) |
coalesce 与repartition之间的区别
repartition算法对数据进行了Shuffle操作,并创建了大小相等的数据分区。coalesce操作合并现有分区以避免Shuffle。除此之外,coalesce操作仅能用于减少分区,不能用于增加分区操作。
按照列字段进行repartition
- 源码
1 | def repartition(partitionExprs: Column*): Dataset[T] = { |
- 解释
返回一个按照指定分区列的新的DataSet,具体的分区数量有参数spark.sql.shuffle.partitions
默认指定,该默认值为200,该操作与HiveSQL的DISTRIBUTE BY操作类似。
repartition除了可以指定具体的分区数之外,还可以指定具体的分区字段。我们可以使用下面的示例来探究如何使用特定的列对DataFrame进行重新分区。
首先创建DataFrame:
1 | val people = List( |
让我们按gender列对DataFrame进行分区:
1 | scala> val genderDF = peopleDF.repartition($"gender") |
按列进行分区时,Spark默认会创建200个分区。此示例将有两个带有数据的分区,其他分区将没有数据。
1 | scala> genderDF.rdd.partitions.size |
4. 重点
合理设置分区数量
假设我们要对一个大数据集进行操作,该数据集的分区数也比较大,那么当我们进行一些操作之后,比如filter过滤操作、sample采样操作,这些操作可能会使结果数据集的数据量大量减少。但是Spark却不会对其分区进行调整,由此会造成大量的分区没有数据,并且向HDFS读取和写入大量的空文件,效率会很低,这种情况就需要我们重新调整分数数量,以此来提升效率。
通常情况下,结果集的数据量减少时,其对应的分区数也应当相应地减少。那么该如何确定具体的分区数呢?
- 分区过少:将无法充分利用群集中的所有可用的CPU core
- 分区过多:产生非常多的小任务,从而会产生过多的开销
在这两者之间,第一个对性能的影响相对比较大。对于小于1000个分区数的情况而言,调度太多的小任务所产生的影响相对较小。但是,如果有成千上万个分区,那么Spark会变得非常慢。
spark中的shuffle分区数是静态的。它不会随着不同的数据大小而变化。上文提到:默认情况下,控制shuffle分区数的参数spark.sql.shuffle.partitions值为200,这将导致以下问题
- 对于较小的数据,200是一个过大的选择,由于调度开销,通常会导致处理速度变慢。
- 对于大数据,200很小,无法有效使用群集中的所有资源
一般情况下,我们可以通过将集群中的CPU数量乘以2、3或4来确定分区的数量。如果要将数据写出到文件系统中,则可以选择一个分区大小,以创建合理大小的文件。
该使用哪种方法进行重分区呢?
对于大型数据集,进行Shuffle操作是很消耗性能的,但是当我们的数据集比较小的时候,可以使用repartition方法进行重分区,这样可以尽量保证每个分区的数据分布比较均匀(使用coalesce可能会造成数据倾斜),对于下游使用者来说效率更高。
如何将数据写入到单个文件
通过使用repartition(1)和coalesce(1))可用于将DataFrame写入到单个文件中。通常情况下,不会只将数据写入到单个文件中,因为这样效率很低,写入速度很慢,在数据量比较大的情况,很可能会出现写入错误的情况。所以,只有当DataFrame很小时,我们才会考虑将其写入到单个文件中。
何时考虑重分区
一般对于在对比较大的数据集进行过滤操作之后,产生的较小数据集,通常需要对其考虑进行重分区,从而提升任务执行的效率。
5. 总结
本文主要介绍了Spark是如何管理分区的,分别解释了Spark提供的两种分区方法,并给出了相应的使用示例和分析。最后对分区情况及其影响进行了讨论,并给出了一些实践的建议。希望本文对你有所帮助。