ZeroMQ

    Akka提供一个 ZeroMQ 模块对 ZeroMQ 连接进行抽象从而允许Akka actor之间在ZeroMQ连接之上进行消息交互。这些消息可以是专有格式或者使用Protobuf来定义。socket actor缺省就具有容错性,当你调用newSocket方法创建新的Socket时它会恰当地对socket进行重新初始化。

    ZeroMQ 在多线程方面有较大的强制性,所以配置选项 一定要设置成 PinnedDispatcher, 这是因为实际的ZeroMQ socket只能在创建它的线程中访问。

    Akka的ZeroMQ模块是按照JZMQ中的API编写的,JZMQ使用JNI与本地ZeroMQ库通信,但Akka ZeroMQ模块没有用JZMQ,用的是ZeroMQ的Scala绑定,与本地ZeroMQ库通信是通过JNA完成的. 换句话说,这个模块所需要的本地库只有本地 ZeroMQ 库. 使用scala库的好处是你不需要编译和管理本地的信赖,当然这会有一些性能上的损失。Scala绑定与JNI绑定是兼容的,如果你的确需要获得最好的性能,直接用后者替换前者即可。

    注意

    zeromq-scala-bindings当前所用的版本只兼容 zeromq 2 ;zeromq 3 不受支持。

    ZeroMQ 支持多种连接模式, 每一种用来满足不同的需求。 目前这个模块支持发布-订阅型连接和基于Router-Dealer的连接. 为了发起连接或接收连接,必须创建一个socket. Socket都是用 akka.zeromq.ZeroMQExtension创建的, 例如:

    或者通过导入 akka.zeromq._ 包来获得隐式的newSocket方法.

    1. import akka.zeromq._
    2. val pubSocket2 = system.newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1234"))

    上例将在本机的1234端口创建一个ZeroMQ发布者socket。.

    类似的你可以创建一个带有监听器的订阅socket, 从发布者订阅所有的消息:

    1. import akka.zeromq._
    2. val listener = system.actorOf(Props(new Actor {
    3. def receive: Receive = {
    4. case Connecting //...
    5. case m: ZMQMessage //...
    6. case _ //...
    7. }
    8. }))
    9. val subSocket = system.newSocket(SocketType.Sub, Listener(listener), Connect("tcp://127.0.0.1:1234"), SubscribeAll)
    发布-订阅型连接

    在一个发布-订阅(pub-sub)型连接中, 一个发布者可以有多个订阅者. 每个订阅者订阅一个或多个主题 , 发布者向主题发布消息. 订阅者也可以订阅所有的主题. 在 Akka 环境中, 如果要向并不直接与某actor打交道的actor发布消息,需要使用pub-sub 连接.

    在使用 zeromq pub/sub 时你必须知道它需要多播 - 检查你的云环境 - 才能正确工作,同时对事件主题的过滤发生在客户端, 因此所有的事件都会广播到所有的订阅者.

    actor订阅主题的过程如下:

    1. val subTopicSocket = system.newSocket(SocketType.Sub, Listener(listener), Connect("tcp://127.0.0.1:1234"), Subscribe("foo.bar"))

    订阅时使用前缀匹配,所以它订阅了所有以 foo.bar 开头的主题. 注意,如果没有提供主题名称或使用了 SubscribeAll , actor 将订阅所有的主题.

    要取消订阅:

    要向主题发布消息你必须使用两个 Frames,第一个Frame是主题.

    1. pubSocket ! ZMQMessage(Seq(Frame("foo.bar"), Frame(payload)))
    Pub-Sub实战

    下例演示了一个有两个订阅者的发布者.

    发布者监视当前的堆使用量和系统负载并周期性在”health.heap” 主题上发布 Heap 事件, 在”health.load”主题上发布 Load 事件.

    1. import akka.zeromq._
    2. import akka.actor.Actor
    3. import akka.actor.Props
    4. import akka.actor.ActorLogging
    5. import akka.serialization.SerializationExtension
    6. import java.lang.management.ManagementFactory
    7. case object Tick
    8. case class Heap(timestamp: Long, used: Long, max: Long)
    9. case class Load(timestamp: Long, loadAverage: Double)
    10. class HealthProbe extends Actor {
    11. val pubSocket = context.system.newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1235"))
    12. val memory = ManagementFactory.getMemoryMXBean
    13. val ser = SerializationExtension(context.system)
    14. override def preStart() {
    15. context.system.scheduler.schedule(1 second, 1 second, self, Tick)
    16. }
    17. override def postRestart(reason: Throwable) {
    18. // 还要调用preStart,仅调度一次
    19. }
    20. def receive: Receive = {
    21. case Tick
    22. val currentHeap = memory.getHeapMemoryUsage
    23. val timestamp = System.currentTimeMillis
    24. // 用 akka SerializationExtension 转换成字节
    25. val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity)
    26. // 第一Frame是话题,第二Frame是消息
    27. pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload)))
    28. // 用 akka SerializationExtension 转换成字节
    29. val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity)
    30. // 第一Frame是主题,第二Frame是消息
    31. pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload)))
    32. }
    33. }
    34. system.actorOf(Props[HealthProbe], name = "health")

    我们添加一个订阅者来记录日志. 它订阅所有以 “health”开头的主题, i.e. 包括 Heap 和 Load 事件.

    1. context.system.newSocket(SocketType.Sub, Listener(self), Connect("tcp://127.0.0.1:1235"), Subscribe("health"))
    2. val ser = SerializationExtension(context.system)
    3. val timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS")
    4. def receive = {
    5. // 第一Frame是主题,第二Frame是消息
    6. case m: ZMQMessage if m.firstFrameAsString == "health.heap"
    7. ser.deserialize(m.payload(1), classOf[Heap]) match {
    8. case Right(Heap(timestamp, used, max))
    9. log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
    10. case Left(e) throw e
    11. }
    12. case m: ZMQMessage if m.firstFrameAsString == "health.load"
    13. ser.deserialize(m.payload(1), classOf[Load]) match {
    14. case Right(Load(timestamp, loadAverage))
    15. log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
    16. case Left(e) throw e
    17. }
    18. }
    19. }
    20. system.actorOf(Props[Logger], name = "logger")
    Router-Dealer Connection

    虽然 Pub/Sub 是很好的连接方式但zeromq的真正优势在于它象一个用于可靠消息通信的 “乐高玩具” . 而由于有如此多的集成方式,它的多语言支持是极好的. 当你使用ZeroMQ来集成多个系统时你可能需要创建自己的ZeroMQ机制。这时就轮到router 和 dealer socket 类型发挥作用了. 使用这些socket类型你可以创建自己的使用TCP/IP的可靠pub sub broker,并实现发布方的事件过滤.

    要创建配置了高水位的Router socket:

    1. val highWatermarkSocket = system.newSocket(
    2. SocketType.Router,
    3. Listener(listener),
    4. Bind("tcp://127.0.0.1:1234"),
    5. HighWatermark(50000))

    akka-zeromq 模块支持大部分zeromq socket的配置选项.

    推-拉型连接

    Akka ZeroMQ 模块支持 推-拉 型连接.

    创建 Push 连接:

    1. def newPushSocket(socketParameters: Array[SocketOption]): ActorRef

    创建 Pull 型连接:

    很快将提供有更多的文档和示例.

    请求-响应型连接

    Akka ZeroMQ 模块支持 请求-响应 型连接.

    创建 响应 连接:

      很快将提供更多的文档和示例.