DStream中的转换(transformation)
最后两个transformation算子需要重点介绍一下:
updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它
- 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态
让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个state表示,它的类型是整数
更新函数将会被每个单词调用,newValues
拥有一系列的1(从 (词, 1)对而来),runningCount拥有之前的次数。要看完整的代码,见例子
操作(以及它的变化形式如transformWith
)允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。
例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用transform
方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流
来清理实时数据,然后过了它们,你可以按如下方法来做:
事实上,你也可以在方法中用和图计算算法
如上图显示,窗口在源DStream上滑动,合并和操作落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。
这说明,任何一个窗口操作都需要指定两个参数:
- 滑动的时间间隔:窗口操作执行的时间间隔
这两个参数必须是源DStream的批时间间隔的倍数。
一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数:窗口长度和滑动的时间间隔
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | 基于源DStream产生的窗口化的批数据计算一个新的DStream |
countByWindow(windowLength, slideInterval) | 返回流中元素的一个滑动窗口数 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks 参数设置不同的任务数 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and “inverse reducing” the old data that leave the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable to only “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。 |