• 参数:

        • cols:列名字符串列表,或者Column 列表。要求这些列具有同样的数据类型
      • 示例:

    1. array_contains(col, value):创建一个新列,指示value是否在array 中(由col 给定)

      其中col 必须是array 类型。而value 是一个值,或者一个Column 或者列名。

      • 判断逻辑:

        • 如果arraynull,则返回null
        • 如果value 位于 array 中,则返回True
        • 如果value 不在 array 中,则返回False
      • 示例:

        1. df = spark_session.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
        2. df.select(array_contains(df.data, "a"))
    2. create_map(*cols):创建一个map 列。

      • 参数:

        • cols:列名字符串列表,或者Column 列表。这些列组成了键值对。如(key1,value1,key2,value2,...)
      • 示例:

        1. df.select(create_map('name', 'age').alias("map")).collect()
        2. #[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
    3. broadcast(df):标记df 这个Dataframe 足够小,从而应用于broadcast join

      • 参数:

        • df:一个 Dataframe 对象
    4. coalesce(*cols):返回第一个非null 的列组成的Column。如果都为null,则返回null

      • 参数:

        • cols:列名字符串列表,或者Column 列表。
    5. crc32(col):计算二进制列的CRC32 校验值。要求col 是二进制列。

    6. explode(col):将一个array 或者 map 列拆成多行。要求col 是一个array 或者map 列。

      示例:

    7. posexplode(col): 对指定array 或者map 中的每个元素,依据每个位置返回新的一行。

      要求col 是一个 或者map 列。

      1. eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
      2. eDF.select(posexplode(eDF.intlist)).collect()
      3. #结果为:[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
    8. expr(str):计算表达式。

      • 参数:

        • str:一个表达式。如length(name)
    9. from_json(col,schema,options={}):解析一个包含JSON 字符串的列。如果遇到无法解析的字符串,则返回null

      • 参数:

        • col:一个字符串列,字符串是json 格式
        • schema:一个StructType(表示解析一个元素),或者StructTypeArrayType(表示解析一组元素)
        • options:用于控制解析过程。
      • 示例:

        1. from pyspark.sql.types import *
        2. schema = StructType([StructField("a", IntegerType())])
        3. df = spark_session.createDataFrame([(1, '{"a": 1}')], ("key", "value"))
        4. df.select(from_json(df.value, schema).alias("json")).collect()
        5. #结果为:[Row(json=Row(a=1))]
    10. get_json_object(col,path):从json 字符串中提取指定的字段。如果json 字符串无效,则返回null.

      • 参数:

        • col:包含json 格式的字符串的列。
        • pathjson 的字段的路径。
      • 示例:

    11. greatest(*cols):返回指定的一堆列中的最大值。要求至少包含2列。

      它会跳过null 值。如果都是null 值,则返回null

    12. least(*cols):返回指定的一堆列中的最小值。要求至少包含2列。

      它会跳过null 值。如果都是null 值,则返回null

    13. json_tuple(col,*fields):从json 列中抽取字段组成新列(抽取n 个字段,则生成n 列)

      • 参数:

        • col:一个json 字符串列
        • fields:一组字符串,给出了json 中待抽取的字段
    14. lit(col):创建一个字面量值的列

    15. monotonically_increasing_id():创建一个单调递增的id 列(64位整数)。

      它可以确保结果是单调递增的,并且是unique的,但是不保证是连续的。

      它隐含两个假设:

      • 假设每个分区的记录数量少于8 billion
    16. 要求col1col2 都是浮点列(DoubleType 或者 FloatType

    17. size(col):计算array/map 列的长度(元素个数)。

    18. sort_array(col,asc=True): 对array 列中的array 进行排序(排序的方式是自然的顺序)

      • 参数:

        • col:一个字符串或者Column, 指定一个array
        • asc: 如果为True,则是升序;否则是降序
    19. spark_partition_id():返回一个partition ID

      该方法产生的结果依赖于数据划分和任务调度,因此是未确定结果的。

    20. struct(*cols):创建一个新的struct 列。

      • 参数:

        • cols:一个字符串列表(指定了列名),或者一个Column 列表
      • 示例:

        1. df.select(struct('age', 'name').alias("struct")).collect()
        2. # [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
    21. to_json(col,options={}):将包含 StructType 或者ArrytypeStructType 转换为json 字符串。如果遇到不支持的类型,则抛出异常。

      • 参数:

        • col:一个字符串或者Column,表示待转换的列
        • options:转换选项。它支持和json datasource 同样的选项
    22. udf(f=None,returnType=StringType):根据用户定义函数(UDF) 来创建一列。

      • 参数:

        • f:一个python 函数,它接受一个参数
        • returnType:一个pyspqrk.sql.types.DataType 类型,表示udf 的返回类型
      • 示例:

        1. from pyspark.sql.types import IntegerType
        2. slen = udf(lambda s: len(s), IntegerType())
        3. df.select(slen("name").alias("slen_name"))
    23. when(condition,value): 对一系列条件求值,返回其中匹配的哪个结果。

      如果Column.otherwise() 未被调用,则当未匹配时,返回None;如果Column.otherwise() 被调用,则当未匹配时,返回otherwise() 的结果。

      • 参数:

        • condition:一个布尔列
      • 示例: