1. 另外,也可以不使用API ,直接将文件加载到DataFrame 并进行查询:

      1. df = spark_session.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

    3.4.1 通用加载

    1. 设置数据格式:.format(source)

      • 返回self
      1. df = spark_session.read.format('json').load('python/test_support/sql/people.json')
    2. 设置数据schema.schema(schema)

      • 返回self
      • 某些数据源可以从输入数据中推断schema。一旦手动指定了schema,则不再需要推断。
    3. 加载:.load(path=None, format=None, schema=None, **options)

      • 参数:

        • path:一个字符串,或者字符串的列表。指出了文件的路径
        • format:指出了文件类型。默认为parquet(除非另有配置spark.sql.sources.default
        • schema:输入数据的schema,一个StructType 类型实例。
        • options:其他的参数
      • 返回值:一个DataFrame 实例

        1. 'python/test_support/sql/people1.json'])

    3.4.2 专用加载

    1. .csv():加载csv 文件,返回一个DataFrame 实例

    2. .jdbc():加载数据库中的表

      • 参数:

        • url:一个JDBC URL,格式为:jdbc:subprotocol:subname
        • table:表名
        • column:列名。该列为整数列,用于分区。如果该参数被设置,那么numPartitions、lowerBound、upperBound 将用于分区从而生成where 表达式来拆分该列。
        • lowerBoundcolumn的最小值,用于决定分区的步长
        • upperBoundcolumn的最大值(不包含),用于决定分区的步长
        • numPartitions:分区的数量
        • predicates:一系列的表达式,用于where中。每一个表达式定义了DataFrame 的一个分区
        • properties:一个字典,用于定义JDBC 连接参数。通常至少为:{ 'user' : 'SYSTEM', 'password' : 'mypassword'}
      • 返回:一个DataFrame 实例
    3. .json():加载json 文件,返回一个DataFrame 实例

      1. .json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)

      示例:

      1. spark_session.read.json('python/test_support/sql/people.json')
      2. rdd = sc.textFile('python/test_support/sql/people.json')
      3. spark_session.read.json(rdd)
    4. .orc():加载ORC文件,返回一个DataFrame 实例

      1. .orc(path)
      1. spark_session.read.orc('python/test_support/sql/orc_partitioned')
    5. .parquet():加载Parquet文件,返回一个DataFrame 实例

      .parquet(*paths)

      示例:

    6. .table(): 从table 中创建一个DataFrame

      1. .table(tableName)

      示例:

      1. df = spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
      2. df.createOrReplaceTempView('tmpTable')
      3. spark_session.read.table('tmpTable')
    7. .text():从文本中创建一个DataFrame

      1. .text(paths)

      示例:

      1. spark_session.read.text('python/test_support/sql/text-test.txt').collect()