使用 Spark Shell
Spark 最主要的抽象是叫Resilient Distributed Dataset(RDD) 的弹性分布式集合。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件)创建,也可以从其他的 RDDs 转换。让我们在 Spark 源代码目录从 README 文本文件中创建一个新的 RDD。
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
RDD 的 actions 从 RDD 中返回值, 可以转换成一个新 RDD 并返回它的引用。让我们开始使用几个操作:
res0: Long = 126
scala> textFile.first() // RDD 的第一行数据
res1: String = # Apache Spark
我们可以把 actions 和 transformations 链接在一起:
RDD actions 和 transformations 能被用在更多的复杂计算中。比方说,我们想要找到一行中最多的单词数量:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
Hadoop 流行的一个通用的数据流模式是 MapReduce。Spark 能很容易地实现 MapReduce:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
这里,我们结合 flatMap, 和 reduceByKey 来计算文件里每个单词出现的数量,它的结果是包含一组(String, Int) 键值对的 RDD。我们可以使用 [collect] 操作在我们的 shell 中收集单词的数量:
scala> wordCounts.collect()
缓存 100 行的文本文件来研究 Spark 这看起来很傻。真正让人感兴趣的部分是我们可以在非常大型的数据集中使用同样的函数,甚至在 10 个或者 100 个节点中交叉计算。你同样可以使用 bin/spark-shell
连接到一个 cluster 来替换掉中的方法进行交互操作。