• 在这个过程中并不会求值。求值发生在action 操作中
      • 在这个过程中并不会改变输入的RDDRDD 是不可变的),而是创建并返回一个新的RDD
    1. spark 会使用谱系图来记录各个RDD 之间的依赖关系

      • 在对RDD 行动操作中,需要这个依赖关系来按需计算每个中间RDD
      • 当持久化的RDD 丢失部分数据时,也需要这个依赖关系来恢复丢失的数据
    1. 基本转换操作:

      • .map(f, preservesPartitioning=False) :将函数 f 作用于当前RDD的每个元素,返回值构成新的RDD

        • preservesPartitioning:如果为True,则新的RDD 保留旧RDD 的分区
      • .flatMap(f, preservesPartitioning=False) :将函数 f 作用于当前RDD的每个元素,将返回的迭代器的内容构成了新的RDD

        • flatMap 可以视作:将返回的迭代器扁平化
      • .mapPartitions(f, preservesPartitioning=False):将函数 f 作用于当前RDD的每个分区,将返回的迭代器的内容构成了新的RDD

      • .mapPartitionsWithIndex(f, preservesPartitioning=False):将函数 f 作用于当前RDD的每个分区以及分区id,将返回的迭代器的内容构成了新的RDD

        • 这里f 函数的参数是f分区id 以及一个集合(表示一个分区的数据)

        示例:

        1. def f(splitIndex, iterator):
        2. xxx
        3. rdd.mapPartitionsWithIndex(f)
      • .filter(f):将函数f(称作过滤器) 作用于当前RDD的每个元素,通过f 的那些元素构成新的RDD

      • .distinct(numPartitions=None):返回一个由当前RDD 元素去重之后的结果组成新的RDD

        • numPartitions:指定了新的RDD 的分区数
      • sample(withReplacement, fraction, seed=None) :对当前RDD 进行采样,采样结果组成新的RDD

        • withReplacement:如果为True,则可以重复采样;否则是无放回采样

        • fractions:新的RDD 的期望大小(占旧RDD的比例)。spark 并不保证结果刚好满足这个比例(只是一个期望值)

          • 如果withReplacement=True:则表示每个元素期望被选择的次数
          • 如果withReplacement=False:则表示每个元素期望被选择的概率
        • seed:随机数生成器的种子
      • .sortBy(keyfunc, ascending=True, numPartitions=None):对当前RDD 进行排序,排序结果组成新的RDD

        • keyfunc:自定义的比较函数
        • ascending:如果为True,则升序排列
      • .groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):返回一个分组的RDD

        示例:

    2. 针对两个RDD的转换操作:

      尽管RDD 不是集合,但是它也支持数学上的集合操作。注意:这些操作都要求被操作的RDD 是相同数据类型的。

      • .union(other):合并两个RDD 中所有元素,生成一个新的RDD

        • other:另一个

        该操作并不会检查两个输入RDD 的重复元素,只是简单的将二者合并(并不会去重)。

      • 该操作会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。

      • .subtract(other, numPartitions=None):存在于第一个RDD 而不存在于第二个RDD 中的所有元素组成的新的RDD

        该操作也会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。

      • .cartesian(other):两个RDD 的笛卡尔积,生成一个新的RDD

        RDD 中的元素是元组 (a,b),其中 a 来自于第一个RDDb 来自于第二个RDD

        • 注意:求大规模的RDD 的笛卡尔积开销巨大
        • 该操作不会保证结果是去重的,它并不需要网络混洗数据。
    3. .keyBy(f):创建一个RDD,它的元素是元组(f(x),x)

      示例:

      1. # 结果为:[(4, 2), (9, 3), (16, 4)]
    4. .pipe(command, env=None, checkCode=False):返回一个RDD,它由外部进程的输出结果组成。

      • 参数:

        • command:外部进程命令
        • env:环境变量
        • checkCode:如果为True,则校验进程的返回值
    5. .randomSplit(weights, seed=None):返回一组新的RDD,它是旧RDD 的随机拆分

      • 参数:

        • weights:一个double的列表。它给出了每个结果DataFrame 的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0
        • seed:随机数种子
    6. .zip(other):返回一个Pair RDD,其中键来自于self,值来自于other

      • 它假设两个RDD 拥有同样数量的分区,且每个分区拥有同样数量的元素
    7. .zipWithIndex():返回一个Pair RDD,其中键来自于self,值就是键的索引。

    8. .zipWithUniqueId():返回一个Pair RDD,其中键来自于self,值是一个独一无二的id

      它不会触发一个spark job,这是它与zipWithIndex 的重要区别。

    3.2 Pair RDD转换操作

    1. Pair RDD 可以使用所有标准RDD 上的可用的转换操作

      • 由于Pair RDD 的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
    2. 基本转换操作:

      • .keys():返回一个新的RDD,包含了旧RDD 每个元素的键

      • .values():返回一个新的RDD,包含了旧RDD 每个元素的值

      • .mapValues(f):返回一个新的RDD,元素为 [K,f(V)](保留原来的键不变,通过f 改变值)。

      • .flatMapValues(f):返回一个新的RDD,元素为 [K,f(V)](保留原来的键不变,通过f 改变值)。它与.mapValues(f) 区别见下面的示例:

      • .sampleByKey(withReplacement, fractions, seed=None):基于键的采样(即:分层采样)

        • 参数:

          • withReplacement:如果为True,则是有放回的采样;否则是无放回的采样
          • fractions:一个字典,指定了键上的每个取值的采样比例(不同取值之间的采样比例无关,不需要加起来为1)
          • seed:随机数种子
      • .subtractByKey(other, numPartitions=None):基于键的差集。返回一个新的RDD,其中每个 都位于self 中,且不在other
    3. 基于键的聚合操作:

      在常规RDD上,fold()、aggregate()、reduce() 等都是行动操作。在Pair RDD 上,有类似的一组操作,用于针对相同键的元素进行聚合。这些操作返回RDD,因此是转化操作而不是行动操作。

      返回的新RDD 的键为原来的键,值为针对键的元素聚合的结果。

      • .reduceByKey(f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):合并具有相同键的元素。f 作用于同一个键的那些元素的值。

        • 它为每个键进行并行的规约操作,每个规约操作将键相同的值合并起来
        • 因为数据集中可能有大量的键,因此该操作返回的是一个新的RDD:由键,以及对应的规约结果组成
      • .foldByKey(zeroValue,f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):通过f聚合具有相同键的元素。其中zeroValue 为零值。参见.fold()

      • .aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):通过f聚合具有相同键的元素。其中zeroValue 为零值。参见.aggregate()

      • .combineByKey(createCombiner,mergeValue,mergeCombiners, numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):它是最为常用的基于键的聚合函数,大多数基于键的聚合函数都是用它实现的。

        aggregate() 一样,combineByKey() 可以让用户返回与输入数据类型不同的返回值。

        你需要提供三个函数:

        • createCombiner(v)v 表示键对应的值。返回一个C 类型的值(表示累加器)
        • mergeValue(c,v)c 表示当前累加器,v 表示键对应的值。返回一个C 类型的值(表示更新后的累加器)
        • mergeCombiners(c1,c2)c1 表示某个分区某个键的累加器,c2 表示同一个键另一个分区的累加器。返回一个C 类型的值(表示合并后的累加器)

        其工作流程是:遍历分区中的所有元素。考察该元素的键:

        • 如果键从未在该分区中出现过,表明这是分区中的一个新的键。则使用createCombiner() 函数来创建该键对应的累加器的初始值。

          注意:这一过程发生在每个分区中,第一次出现各个键的时候发生。而不仅仅是整个RDD 中第一次出现一个键时发生。

        • 如果键已经在该分区中出现过,则使用mergeValue() 函数将该键的累加器对应的当前值与这个新的值合并

        • 由于每个分区是独立处理的,因此同一个键可以有多个累加器。如果有两个或者更多的分区都有同一个键的累加器,则使用mergeCombiners() 函数将各个分区的结果合并。

    4. 数据分组:

      • .groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):根据键来进行分组。

        • 返回一个新的RDD,类型为[K,Iterable[V]],其中K 为原来RDD 的键的类型,V 为原来RDD 的值的类型。
        • 如果你分组的目的是为了聚合,那么直接使用reduceByKey、aggregateByKey 性能更好。
      • .cogroup(other,numPartitions=None):它基于selfother 两个TDD 中的所有的键来进行分组,它提供了为多个RDD 进行数据分组的方法。

        • 返回一个新的RDD,类型为[K,(Iterable[V],Iterable[W])] 。其中K 为两个输入RDD 的键的类型,V 为原来self的值的类型,Wother 的值的类型。
        • 如果某个键只存在于一个输入RDD 中,另一个输入RDD 中不存在,则对应的迭代器为空。
        • 它是groupWith 的别名,但是groupWith 支持更多的TDD 来分组。
    5. 数据连接:

      数据连接操作的输出RDD 会包含来自两个输入RDD 的每一组相对应的记录。输出RDD 的类型为[K,(V,W)] ,其中K 为两个输入RDD 的键的类型,V 为原来self的值的类型,Wother 的值的类型。

      • .join(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的内连接。
      • .leftOuterJoin(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的左外连接。
      • .rightOuterJoin(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的右外连接。