Spark核心概念RDD
RDD(Resilient Distributed Datasets)弹性分布式数据集 ,是Spark中最基本的抽象,在 RDD 源码中这样来描述 RDD:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
个人理解,RDD可并行化数据集的抽象,对它的操作能自动分发到集群上处理。
RDD支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。
经典的WordCount例子,同时包含了actions和transformation,如下图:
其中hello.txt如下图:
下面是transformation和actions的常用算子介绍。
一、Transformation
spark 常用的 Transformation 算子如下表:
Transformation 算子 | Meaning(含义) |
---|---|
map(func) | 对原 RDD 中每个元素运用 func 函数,并生成新的 RDD |
filter(func) | 对原 RDD 中每个元素使用func 函数进行过滤,并生成新的 RDD |
flatMap(func) | 与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq )。 |
mapPartitions(func) | 与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator |
mapPartitionsWithIndex(func) | 与 mapPartitions 类似,但 func 类型为 (Int, Iterator |
sample(withReplacement, fraction, seed) | 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed); |
union(otherDataset) | 合并两个 RDD |
intersection(otherDataset) | 求两个 RDD 的交集 |
distinct([numTasks])) | 去重 |
groupByKey([numTasks]) | 按照 key 值进行分区,即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, IterablereduceByKey 或 aggregateByKey 性能会更好 Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传入 numTasks 参数进行修改。 |
reduceByKey(func, [numTasks]) | 按照 key 值进行分组,并对分组后的数据执行归约操作。 |
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) | 当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数进行配置。 |
sortByKey([ascending], [numTasks]) | 按照 key 进行排序,其中的 key 需要实现 Ordered 特质,即可比较 |
join(otherDataset, [numTasks]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin , rightOuterJoin 和 fullOuterJoin 等算子。 |
cogroup(otherDataset, [numTasks]) | 在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable |
cartesian(otherDataset) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset(即笛卡尔积)。 |
coalesce(numPartitions) | 将 RDD 中的分区数减少为 numPartitions。 |
repartition(numPartitions) | 随机重新调整 RDD 中的数据以创建更多或更少的分区,并在它们之间进行平衡。 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。 |
下面分别给出这些算子的基本使用示例:
1.1 map
1 | val list = List(1,2,3) |
1.2 filter
1 | val list = List(3, 6, 9, 10, 12, 21) |
1.3 flatMap
flatMap(func)
与 map
类似,但每一个输入的 item 会被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq
)。
二者的区别如图:
map
flatMap
1 | val list = List(List(1, 2), List(3), List(), List(4, 5)) |
flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:拆分输入的每行数据为单个单词,并赋值为 1,代表出现一次,之后按照单词分组并统计其出现总次数,代码如下:
1 | val lines = List("spark flume spark", |
1.4 mapPartitions
与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator<T> => Iterator<U>
(其中 T 是 RDD 的类型),即输入和输出都必须是可迭代类型。
map是对rdd中的每一个元素进行操作;
mapPartitions则是对rdd中的每个分区的迭代器进行操作,优点是快,缺点是可能内存溢出(map会自动回收内存)
1 | val list = List(1, 2, 3, 4, 5, 6) |
1.5 mapPartitionsWithIndex
与 mapPartitions 类似,但 func 类型为 (Int, Iterator<T>) => Iterator<U>
,其中第一个参数为分区索引。
1 | val list = List(1, 2, 3, 4, 5, 6) |
1.6 sample
数据采样。有三个可选参数:设置是否放回 (withReplacement)、采样的百分比 (fraction)、随机数生成器的种子 (seed) 。
其中fraction
参数在withReplacement
不同时的含义不同:
- 当
withReplacement=false
时:表示每个元素被抽到的概率,分数一定是[0,1] ; - 当
withReplacement=true
时:表示选择每个元素的期望次数,分数必须大于等于0。
1 | val list = List(1, 2, 3, 4, 5, 6) |
1.7 union
合并两个 RDD:
1 | val list1 = List(1, 2, 3) |
1.8 intersection
求两个 RDD 的交集:
1 | val list1 = List(1, 2, 3, 4, 5) |
1.9 distinct
去重:
1 | val list = List(1, 2, 2, 4, 4) |
1.10 groupByKey
按照键进行分组:
1 | val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2)) |
注:groupByKey
返回类型为CompactBuffer
(ArrayBuffer
的替代选择,占用内存更少),直接打印结果如下:
1 | sc.parallelize(list).groupByKey.foreach(println) |
1.11 reduceByKey
按照键进行归约操作:
1 | val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2)) |
经典WordCount算子。
1.12 sortBy & sortByKey
按照键(100、)进行排序:
1 | val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm")) |
按照指定元素进行排序:
1 | val list02 = List(("hadoop",100), ("spark",90), ("storm",120)) |
1.13 join
在一个 (K, V) 和 (K, W) 类型的 Dataset 上调用时,返回一个 (K, (V, W)) 的 Dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin
, rightOuterJoin
和 fullOuterJoin
等算子。
1 | val list01 = List((1, "student01"), (2, "student02"), (3, "student03")) |
1.14 cogroup
在一个 (K, V) 对的 Dataset 上调用时,返回多个类型为 (K, (Iterable
1 | val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e")) |
1.15 cartesian
计算笛卡尔积:
1 | val list1 = List("A", "B", "C") |
1.16 aggregateByKey
当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey
类似,reduce 任务的数量可通过第二个参数 numPartitions
进行配置。示例如下:
1 | // 为了清晰,以下所有参数均使用具名传参 |
这里使用了 numSlices = 2
指定 aggregateByKey 父操作 parallelize 的分区数量为 2,其执行流程如下:
基于同样的执行流程,如果 numSlices = 1
,则意味着只有输入一个分区,则其最后一步 combOp 相当于是无效的,执行结果为:
1 | (hadoop,3) |
同样的,如果每个单词对一个分区,即 numSlices = 6
,此时相当于求和操作,执行结果为:
1 | (hadoop,5) |
aggregateByKey(zeroValue = 0,numPartitions = 3)
的第二个参数 numPartitions
决定的是输出 RDD 的分区数量,想要验证这个问题,可以对上面代码进行改写,使用 getNumPartitions
方法获取分区数量:
1 | sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)( |
二、Action
Spark 常用的 Action 算子如下:
Action(动作) | Meaning(含义) |
---|---|
reduce(func) | 使用函数func执行归约操作 |
collect() | 以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。 |
count() | 返回 dataset 中元素的个数。 |
first() | 返回 dataset 中的第一个元素,等价于 take(1)。 |
take(n) | 将数据集中的前 n 个元素作为一个 array 数组返回。 |
takeSample(withReplacement, num, [seed]) | 对一个 dataset 进行随机抽样 |
takeOrdered(n, [ordering]) | 按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。 |
saveAsTextFile(path) | 将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。 |
saveAsSequenceFile(path) | 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求 RDD 中的元素需要实现 Hadoop 的 Writable 接口。对于 Scala 语言而言,它可以将 Spark 中的基本数据类型自动隐式转换为对应 Writable 类型。(目前仅支持 Java and Scala) |
saveAsObjectFile(path) | 使用 Java 序列化后存储,可以使用 SparkContext.objectFile() 进行加载。(目前仅支持 Java and Scala) |
countByKey() | 计算每个键出现的次数。 |
foreach(func) | 遍历 RDD 中每个元素,并对其执行fun函数 |
2.1 reduce
使用函数func执行归约操作:
1 | val list = List(1, 2, 3, 4, 5) |
2.2 takeOrdered
按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。需要注意的是 takeOrdered
使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承 Ordering[T]
实现自定义比较器,然后将其作为隐式参数引入。
1 | def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { |
自定义规则排序:
1 | // 继承 Ordering[T],实现自定义比较器,按照 value 值的长度进行排序 |
2.3 countByKey
计算每个键出现的次数:
1 | val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1)) |
2.4 saveAsTextFile
将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。
1 | val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1)) |