-
x
上述代码的行为是不确定的,并且无法按照预期正常工作。
在执行作业时, 会分解
RDD
操作到每个executor
的task
中。在执行之前,spark
计算任务的闭包- 所谓闭包:指的是
executor
要在RDD
上进行计算时,必须对执行节点可见的那些变量和方法 - 闭包被序列化,并被发送到每个
executor
- 所谓闭包:指的是
在上述代码中,闭包的变量的副本被发送给每个
executor
,当counter
被foreach
函数引用时,它已经不再是驱动器节点的counter
了- 虽然驱动器程序中,仍然有一个
counter
在内存中;但是对于executors
,它是不可见的。 executor
看到的只是序列化的闭包的一个副本。所有对counter
的操作都是在 的本地进行。- 要想正确实现预期目标,则需要使用累加器
- 虽然驱动器程序中,仍然有一个
一个累加器(
Accumulator
)变量只支持累加操作工作节点对它执行的任何累加,都将自动的传播到驱动器程序中。
Accumulator
的方法:.add(term)
:向累加器中增加值term
Accumulator
的属性:.value
:获取累加器的值。只可以在驱动器程序中使用
-
- 在驱动器程序中调用
SparkContext.accumulator(init_value)
来创建出带有初始值的累加器 - 在执行器的代码中使用累加器的
+=
方法或者.add(term)
方法来增加累加器的值 - 在驱动器程序中使用累加器的
.value
属性来访问累加器的值
示例:
file=sc.textFile('xxx.txt')
def xxx(line):
if yyy:
acc+=1
return zzz
rdd=file.map(xxx)
- 在驱动器程序中调用
1.2 累加器与容错性
spark
中同一个任务可能被运行多次:- 如果工作节点失败了,则
spark
会在另一个节点上重新运行该任务 - 如果工作节点处理速度比别的节点慢很多,则
spark
也会抢占式的在另一个节点上启动一个投机性的任务副本 - 甚至有时候
spark
需要重新运行任务来获取缓存中被移出内存的数据
- 如果工作节点失败了,则
当
spark
同一个任务被运行多次时,任务中的累加器的处理规则:在行动操作中使用的累加器,
spark
确保每个任务对各累加器修改应用一次- 因此:如果想要一个无论在失败还是重新计算时,都绝对可靠的累加器,我们必须将它放在
foreach()
这样的行动操作中
- 因此:如果想要一个无论在失败还是重新计算时,都绝对可靠的累加器,我们必须将它放在
在转化操作中使用的累加器,无法保证只修改应用一次。
- 在转化操作中,累加器通常只用于调试目的