5.2.1 转换操作

      • :在整个DataFrame 开展聚合操作(是df.groupBy.agg() 的快捷方式)

        示例:

      • .filter(condition):对行进行过滤。

        • 它是where() 的别名

        • 参数:

          • condition:一个types.BooleanTypeColumn,或者一个字符串形式的SQL 的表达式
        • 示例:

          1. df.filter(df.age > 3).collect()
          2. df.filter("age > 3").collect()
          3. df.where("age = 2").collect()
    1. 分组:

      • .cube(*cols):根据当前DataFrame 的指定列,创建一个多维的cube,从而方便我们之后的聚合过程。

        • 参数:

          • cols:指定的列名或者Column的列表
        • 返回值:一个GroupedData 对象
    • .groupBy(*cols):通过指定的列来将DataFrame 分组,从而方便我们之后的聚合过程。

      • 参数:

        • cols:指定的列名或者Column的列表
      • 返回值:一个GroupedData 对象

      • 它是groupby的别名

    • .rollup(*cols):创建一个多维的rollup,从而方便我们之后的聚合过程。

      • 参数:

        • cols:指定的列名或者Column的列表
      • 返回值:一个GroupedData 对象
    1. 排序:

      • .orderBy(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列排序

        • 参数:

          • cols:一个列名或者Column 的列表,指定了排序列

          • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

            • 如果是列表,则必须和cols 长度相同
      • .sort(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列排序

        • 参数:

          • cols:一个列名或者Column 的列表,指定了排序列

          • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

            • 如果是列表,则必须和cols 长度相同
        • 示例:

          ​x

          1. from pyspark.sql.functions import *
          2. df.sort(df.age.desc())
          3. df.sort("age", ascending=False)
          4. df.sort(asc("age"))
          5. df.orderBy(df.age.desc())
          6. df.orderBy("age", ascending=False)
          7. df.orderBy(asc("age"))
      • .sortWithinPartitions(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列在每个分区进行排序

        • 参数:

          • cols:一个列名或者Column 的列表,指定了排序列

          • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

            • 如果是列表,则必须和cols 长度相同
    2. 调整分区:

      • .coalesce(numPartitions):返回一个新的DataFrame,拥有指定的numPartitions 分区。

        • 只能缩小分区数量,而无法扩张分区数量。如果numPartitions 比当前的分区数量大,则新的DataFrame 的分区数与旧DataFrame 相同

        • 它的效果是:不会混洗数据

        • 参数:

          • numPartitions:目标分区数量
      • .repartition(numPartitions, *cols):返回一个新的DataFrame,拥有指定的numPartitions 分区。

        • 结果DataFrame 是通过hash 来分区
        • 它可以增加分区数量,也可以缩小分区数量
    3. 集合操作:

      • .crossJoin(other):返回一个新的DataFrame,它是输入的两个DataFrame 的笛卡儿积

        • 参数:

          • other:另一个DataFrame 对象
      • .join(other,on=None,how=None):返回两个DataFramejoin

        • 参数:

          • other:另一个DataFrame 对象

          • 注意:要求两个DataFrame 都存在这些列

          • how:指定 的方式,默认为'inner'。可以为: innercrossouterfullfull_outerleftleft_outerrightright_outerleft_semileft_anti

      • .subtract(other):返回一个新的DataFrame,它的行由位于self 中、但是不在other 中的Row 组成。

        • 参数:

          • other:另一个DataFrame 对象
      • .union(other): 返回两个DataFrame的行的并集(它并不会去重)

        • 它是unionAll 的别名

        • 参数:

          • other:另一个DataFrame 对象
    4. 统计:

      • .crosstab(col1, col2):统计两列的成对频率。要求每一列的distinct 值数量少于 个。最多返回 5.2 方法 - 图1 对频率。

        • 它是DataFrameStatFunctions.crosstab() 的别名

        • 结果的第一列的列名为,col1_col2,值就是第一列的元素值。后面的列的列名就是第二列元素值,值就是对应的频率。

        • 参数:

          • col1,col2:列名字符串(或者Column
        • 示例:

      • .describe(*cols):计算指定的数值列、字符串列的统计值。

        • 统计结果包括:count、mean、stddev、min、max

        • 该函数仅仅用于探索数据规律

        • 参数:

          • cols:列名或者多个列名字符串(或者Column)。如果未传入任何列名,则计算所有的数值列、字符串列
      • .freqItems(cols,support=None):寻找指定列中频繁出现的值(可能有误报)

        • 它是DataFrameStatFunctions.freqItems() 的别名

        • 参数:

          • cols:字符串的列表或者元组,指定了待考察的列
          • support:指定所谓的频繁的标准(默认是 1%)。该数值必须大于
    5. 移除数据:

      • .distinct():返回一个新的DataFrame,它保留了旧DataFrame 中的distinct 行。

        即:根据行来去重

      • .drop(*cols):返回一个新的DataFrame,它剔除了旧DataFrame 中的指定列。

        • 参数:

          • cols:列名字符串(或者Column)。如果它在旧DataFrame 中不存在,也不做任何操作(也不报错)
      • .dropDuplicates(subset=None):返回一个新的DataFrame,它剔除了旧DataFrame 中的重复行。

        它与.distinct() 区别在于:它仅仅考虑指定的列来判断是否重复行。

        • 参数:

          • subset:列名集合(或者Column的集合)。如果为None,则考虑所有的列。
        • .drop_duplicates.dropDuplicates 的别名
      • .dropna(how='any', thresh=None, subset=None):返回一个新的DataFrame,它剔除了旧DataFrame 中的null行。

        • 它是DataFrameNaFunctions.drop() 的别名

        • 参数:

          • how:指定如何判断null 行的标准。'all':所有字段都是na,则是空行;'any':任何字段存在na,则是空行。
          • thresh:一个整数。当一行中,非null 的字段数量小于thresh 时,认为是空行。如果该参数设置,则不考虑how
          • subset:列名集合,给出了要考察的列。如果为None,则考察所有列。
      • .limit(num):返回一个新的DataFrame,它只有旧DataFrame 中的num行。
    6. 采样、拆分:

      • .randomSplit(weights, seed=None):返回一组新的DataFrame,它是旧DataFrame 的随机拆分

        • 参数:

          • weights:一个double的列表。它给出了每个结果DataFrame 的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0
          • seed:随机数种子
        • 示例:

          1. splits = df.randomSplit([1.0, 2.0], 24)
          2. splits[0].count()
      • .sample(withReplacement, fraction, seed=None):返回一个新的DataFrame,它是旧DataFrame 的采样

        • 参数:

          • withReplacement:如果为True,则可以重复采样;否则是无放回采样

          • fractions:新的DataFrame 的期望大小(占旧DataFrame的比例)。spark 并不保证结果刚好满足这个比例(只是一个期望值)

            • 如果withReplacement=True:则表示每个元素期望被选择的次数
            • 如果withReplacement=False:则表示每个元素期望被选择的概率
          • seed:随机数生成器的种子
      • .sampleBy(col, fractions, seed=None):返回一个新的DataFrame,它是旧 的采样

        它执行的是无放回的分层采样。分层由col 列指定。

        • 参数:

          • col:列名或者Column,它给出了分层的依据
          • fractions:一个字典,给出了每个分层抽样的比例。如果某层未指定,则其比例视作 0
        • 示例:

          1. sampled = df.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
          2. # df['key'] 这一列作为分层依据,0 抽取 10%, 1 抽取 20%
    7. 替换:

      • .replace(to_replace, value=None, subset=None):返回一组新的DataFrame,它是旧DataFrame 的数值替代结果

        • 当替换时,value 将被类型转换到目标列

        • 参数:

          • to_replace:可以为布尔、整数、浮点数、字符串、列表、字典,给出了被替代的值。

            • 如果是字典,则给出了每一列要被替代的值
          • value:一个整数、浮点数、字符串、列表。给出了替代值。

          • subset:列名的列表。指定要执行替代的列。

      • .fillna(value, subset=None):返回一个新的DataFrame,它替换了旧DataFrame 中的null值。

        • 它是DataFrameNaFunctions.fill()的别名

        • 参数:

          • value:一个整数、浮点数、字符串、或者字典,用于替换null 值。如果是个字典,则忽略subset,字典的键就是列名,指定了该列的null值被替换的值。
          • subset:列名集合,给出了要被替换的列
    8. 选取数据:

      • .select(*cols):执行一个表达式,将其结果返回为一个DataFrame

        • 参数:

          • cols:一个列名的列表,或者Column 表达式。如果列名为*,则扩张到所有的列名
        • 示例:

      • .selectExpr(*expr):执行一个SQL 表达式,将其结果返回为一个DataFrame

        • 参数:

          • expr:一组SQL 的字符串描述
        • 示例:

          1. df.selectExpr("age * 2", "abs(age)")
      • .toDF(*cols):选取指定的列组成一个新的DataFrame

        • 参数:

          • cols:列名字符串的列表
      • .toJSON(use_unicode=True):返回一个新的DataFrame,它将旧的DataFrame 转换为RDD(元素为字符串),其中每一行转换为json 字符串。
    9. 列操作:

      • .withColumn(colName, col):返回一个新的DataFrame,它将旧的DataFrame 增加一列(或者替换现有的列)

        • 参数:

          • colName:一个列名,表示新增的列(如果是已有的列名,则是替换的列)
          • col:一个Column 表达式,表示新的列
        • 示例:

          1. df.withColumn('age2', df.age + 2)
      • .withColumnRenamed(existing, new):返回一个新的DataFrame,它将旧的DataFrame 的列重命名

        • 参数:

          • existing:一个字符串,表示现有的列的列名
          • col:一个字符串,表示新的列名

    5.2.2 行动操作

    1. 查看数据:

      • .collect():以Row 的列表的形式返回所有的数据

      • .first():返回第一行(一个Row对象)

      • .head(n=None):返回前面的n

        • 参数:

          • n:返回行的数量。默认为1
        • 返回值:

          • 如果返回1行,则是一个Row 对象
          • 如果返回多行,则是一个Row 的列表
      • .show(n=20, truncate=True):在终端中打印前 n 行。

        • 它并不返回结果,而是print 结果

        • 参数:

          • n:打印的行数
          • truncate:如果为True,则超过20个字符的字符串被截断。如果为一个数字,则长度超过它的字符串将被截断。
      • .take(num):以Row 的列表的形式返回开始的num 行数据。

        • 参数:

          • num:返回行的数量
      • .toLocalIterator():返回一个迭代器,对它迭代的结果就是DataFrame的每一行数据(Row 对象)
    2. 统计:

      • .corr(col1, col2, method=None):计算两列的相关系数,返回一个浮点数。当前仅支持皮尔逊相关系数

        • DataFrame.corr()DataFrameStatFunctions.corr()的别名

        • 参数:

          • col,col2:为列的名字字符串(或者Column)。
          • method:当前只支持'pearson'
      • .cov(col1,col2):计算两列的协方差。

        • DataFrame.cov()DataFrameStatFunctions.cov()的别名

        • 参数:

          • col,col2:为列的名字字符串(或者Column
      • .count():返回当前DataFrame 有多少行
    3. 遍历:

      • .foreach(f):对DataFrame 中的每一行应用f

        • 它是df.rdd.foreach() 的快捷方式
      • .foreachPartition(f):对DataFrame 的每个分区应用f

        • 它是df.rdd.foreachPartition() 的快捷方式

      • .toPandas():将DataFrame 作为 返回

        • 只有当数据较小,可以在驱动器程序中放得下时,才可以用该方法