自定义receiver指南
这一节开始实现一个Receiver。一个自定义的receiver必须继承
这个抽象类,实现它的两个方法onStart()
(开始接收数据)以及onStop()
(停止接收数据)。
需要注意的是onStart()
和onStop()
不能够无限期的阻塞。通常情况下,onStart()
启动线程负责数据的接收,onStop()
确保这个接收过程停止。接收线程也能够调用receiver
的
方法去检查是否已经停止接收数据。
一旦接收了数据,这些数据就能够通过调用store(data)
方法存到Spark中,store(data)
是[Receiver]类中的方法。有几个重载的store()
方法允许你存储接收到的数据(record-at-a-time or as whole collection of objects/serialized bytes)
在接收线程中出现的任何异常都应该被捕获或者妥善处理从而避免receiver
在没有提示的情况下失败。restart(<exception>)
方法将会重新启动receiver
,它通过异步的方式首先调用onStop()
方法,
然后在一段延迟之后调用onStart()
方法。stop(<exception>)
将会调用方法终止receiver
。reportError(<error>)
方法在不停止或者重启receiver
的情况下打印错误消息到
驱动程序(driver)。
在Spark流应用程序中,用streamingContext.receiverStream(<instance of custom receiver>)
方法,可以使用自动用receiver
。代码如下所示:
完整的代码见例子
基于Receiver的稳定性以及容错语义,Receiver分为两种类型
- Reliable Receiver:可靠的源允许发送的数据被确认。一个可靠的receiver正确的应答一个可靠的源,数据已经收到并且被正确地复制到了Spark中(指正确完成复制)。实现这个receiver并
仔细考虑源确认的语义。
为了实现可靠receiver,你必须使用store(multiple-records)
去保存数据。保存的类型是阻塞访问,即所有给定的记录全部保存到Spark中后才返回。如果receiver的配置存储级别利用复制
(默认情况是复制),则会在复制结束之后返回。因此,它确保数据被可靠地存储,receiver恰当的应答给源。这保证在复制的过程中,没有数据造成的receiver失败。因为缓冲数据不会应答,从而
可以从源中重新获取数据。
- 系统注重分块,将数据分为适当大小的块。
- 如果指定了速率的限制,系统注重控制接收速率。
下面是两类receiver的特征
Receiver Type | Characteristics
—- | —-
Unreliable Receivers | 实现简单;系统更关心块的生成和速率的控制;没有容错的保证,在receiver失败时会丢失数据
Reliable Receivers | 高容错保证,零数据丢失;块的生成和速率的控制需要手动实现;实现的复杂性依赖源的确认机制
自定义的Akka actor也能够拥有接收数据。ActorHelpertrait可以
应用于任何Akka actor中。它运行接收的数据通过调用store()
方法存储于Spark中。可以配置这个actor的监控(supervisor)策略处理错误。
利用这个actor,一个新的输入数据流就能够被创建。
完整的代码间例子