多语言接口协议

    Storm 的多语言支持主要通过 ShellBolt,ShellSpout 和 ShellProcess 类来实现。这些类实现了 IBolt 接口、ISpout 接口,并通过使用 Java 的 ProcessBuilder 类调用 shell 进程实现了执行脚本的接口协议。

    输出域

    输出域是拓扑的 Thrift 定义的一部分。也就是说,如果你在 Java 中使用了多语言接口,那么你就需要创建一个继承自 ShellBolt 并实现 IRichBolt 接口的 bolt,这个 bolt 还需要在 方法中声明输出域(ShellSpout 也有类似的问题)。

    你可以在一文中了解更多相关信息。

    最简单的协议是通过执行脚本或程序的标准输入输出(STDIN/STDOUT)来实现的。在这个过程中传输的数据都是以 JSON 格式编码的,这样可以支持很多种语言。

    打包

    为了在集群上运行壳组件,执行的外壳脚本必须和待提交的 jar 包一起置于 resources/ 目录下。

    但是,在本地开发测试时,resources 目录只需要保持在 classpath 中即可。

    协议

    注意:

    • 输入输出协议的结尾都使用行读机制,所以,必须要修剪掉输入中的新行并将他们添加到输出中。
    • 所有的 JSON 输入输出都由一个包含 “end” 的行结束标志。注意,这个定界符并不是 JSON 的一部分。
    • 下面的几个标题就是从脚本作者的 STDIN 与 STDOUT 的角度出发的。

    两种类型壳组件的初始握手过程都是相同的:

    • STDIN: 设置信息。这是一个包含 Storm 配置、PID 目录、拓扑上下文的 JSON 对象:

    你的脚本应该在这个目录下创建一个以 PID 命名的空文件。比如,PID 是 1234 的时候,在目录中创建一个名为 1234 的空文件。这个文件可以让 supervisor 了解到进程的 PID,这样,supervisor 在需要的时候就可以关闭该进程。

    • STDOUT: 你的 PID,以 JSON 对象的形式展现,比如 {"pid": 1234}。这个壳组件将会把 PID 记录到它自己的日志中。

    接下来怎么做就要取决于组件的具体类型了。

    Spouts

    Shell Spouts 都是同步的。以下内容是在一个 while(true) 循环中实现的:

    • STDIN: 一个 next、ack 或者 fail 命令。

    “next” 与 ISpout 的 nextTuple 等价,可以这样定义 “next”:

    1. {"command": "next"}

    可以这样定义 “ack”:

      可以这样定义 “fail”:

      1. {"command": "fail", "id": "1231231"}
      • STDOUT: 前面的命令对你的 spout 作用产生的结果。这个结果可以是一组 emits 和 logs。

      emit 大概是这样的:

      如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。

      “log” 会将消息记录到 worker log 中,“log” 大概是这样的:

      1. {
      2. "command": "log",
      3. // 待记录的消息
      4. "msg": "hello world!"
      • STDOUT: “sync” 命令会结束 emits 与 logs 的队列,“sync” 是这样使用的:
      1. {"command": "sync"}

      在 sync 之后, ShellSpout 不会继续读取你的输出,直到它发送出新的 next,ack 或者 fail。

      Shell Bolts 的协议是异步的。你会在有 tuple 可用时立即从 STDIN 中获取到 tuple,同时你需要像下面的示例这样调用 emit,ack,fail,log 等操作写入 STDOUT:

      • STDIN: 就是一个 tuple!这是一个 JSON 编码的结构:
      1. {
      2. // tuple 的 id,为了兼容缺少 64 位数据类型的语言,这里使用了字符串
      3. "id": "-6955786537413359385",
      4. // 创建该 tuple 的 id
      5. "comp": "1",
      6. // tuple 将要发往的流 id
      7. "stream": "1",
      8. // 创建该 tuple 的任务
      9. "task": 9,
      10. "tuple": ["snow white and the seven dwarfs", "field2", 3]
      11. }
      • STDOUT: 一个 ack,fail,emit 或者 log。例如,emit 是这样的:

      如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。注意,由于 shell bolt 协议的异步特性,如果你在 emit 之后立即接收数据,有可能不会收到对应的任务 id,而是收到上一个 emit 的任务 id,或者是一个待处理的新 tuple。然而,最终接收到的任务 id 序列仍然是和 emit 的顺序完全一致的。

      ack 是这样的:

      1. {
      2. "command": "ack",
      3. "id": "123123"
      4. }

      fail 是这样的:

      1. {
      2. "command": "fail",
      3. // 待 fail 的 tuple
      4. "id": "123123"
      5. }

      “log” 会将消息记录到 worker log 中,“log” 是这样的:

      1. {
      2. "command": "log",
      3. // 待记录的消息
      4. "msg": "hello world!"
      5. }
      • 注意:对于 0.7.1 版本,shell bolt 不再需要进行“同步”。

      处理心跳(0.9.3 及以上版本适用)

      Storm 0.9.3 通过在 ShellSpout/ShellBolt 与他们的多语言子进程之间使用心跳来检测子进程是否处于挂起或僵死状态。所有通过多语言接口与 Storm 交互的库都必须使用以下步骤来……

      Spout

      Shell Spouts 是同步的,所有子进程会在 next() 的结尾发送 sync 命令。因此,你不需要为 spouts 做过多的处理。也就是说,在 next() 过程中不能够让子进程的 sleep 时间超过 worker 的延时时间。

      Bolt

      Shell Bolts 是异步的,所以 ShellBolt 会定期向它的子进程发送心跳 tuple。心跳 tuple 是这样的:

      在子进程收到心跳 tuple 之后,它必须向 ShellBolt 发送一个 命令。