RDDs

    编程指定模式

    • 从原来的RDD创建一个行的RDD
    • 创建由一个表示的模式与第一步创建的RDD的行结构相匹配
    • 在行RDD上通过applySchema方法应用模式
    1. // sc is an existing SparkContext.
    2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    3. // Create an RDD
    4. val people = sc.textFile("examples/src/main/resources/people.txt")
    5. val schemaString = "name age"
    6. import org.apache.spark.sql._
    7. // Generate the schema based on the string of schema
    8. val schema =
    9. StructType(
    10. schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    11. // Convert records of the RDD (people) to Rows.
    12. val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
    13. // Register the SchemaRDD as a table.
    14. peopleSchemaRDD.registerTempTable("people")
    15. // SQL statements can be run by using the sql methods provided by sqlContext.
    16. val results = sqlContext.sql("SELECT name FROM people")
    17. // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.