分布式 RPC

    DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。例如,以下是一个使用参数 “http://twitter.com” 调用 “reach” 函数计算结果的例子:

    下图是 DRPC 的原理示意图。

    客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

    1. public static class ExclaimBolt extends BaseBasicBolt {
    2. public void execute(Tuple tuple, BasicOutputCollector collector) {
    3. String input = tuple.getString(1);
    4. collector.emit(new Values(tuple.getValue(0), input + "!"));
    5. }
    6. public void declareOutputFields(OutputFieldsDeclarer declarer) {
    7. }
    8. }
    9. public static void main(String[] args) throws Exception {
    10. TopologyBuilder builder = new TopologyBuilder();
    11. // builder.setBolt(new ExclaimBolt(), 3);
    12. // submit(builder.createTopology());
    13. }

    DRPC 可以在本地模式下运行。以下是使用本地模式构造拓扑的例子:

    在这种模式下,首先你会创建一个 LocalDPRC 对象,该对象会在进程中模拟一个 DRPC 服务器,其作用类似于 LocalCluster 在进程中模拟 Storm 集群的功能。在定义好拓扑的各个组件之后,就可以使用 LocalCluster 来提交拓扑。在本地模式下 LocalDPRC 对象不会绑定到任何一个实际的端口,所以需要通过向 DRPCSpout 传入参数的方式来关联到拓扑中。

    在启动拓扑后,你可以使用 execute 方法来完成 DRPC 调用。

    远程模式 DRPC

    在一个实际的集群中使用 DRPC 有以下三个步骤:

    1. 配置并启动 DRPC 服务器;
    2. 在集群的各个服务器上配置 DRPC 服务器的地址;
    3. 将 DRPC 拓扑提交到集群运行。
    1. bin/storm drpc

    接下来,你需要在集群的各个服务器上配置 DRPC 服务器的地址。这是为了让 DRPCSpout 了解从哪里获取函数调用的方法。可以通过编辑 storm.yaml 或者添加拓扑配置的方式实现配置。配置 的方式类似于下面这样:

    最后,你可以像其他拓扑一样使用 StormSubmitter 来启动拓扑。

    以下是使用远程模式构造拓扑的一个例子:

    1. DRPCSpout spout = new DRPCSpout("exclamation");
    2. builder.setSpout("drpc", spout, 3);
    3. builder.setBolt("exclaim", new ExclamationBolt(), 3)
    4. .shuffleGrouping("drpc");
    5. builder.setBolt("return", new ReturnResults(), 7)
    6. .shuffleGrouping("exclaim");
    7. Config conf = new Config();
    8. conf.setNumWorkers(2);

    请参考Trident 教程一文中计算指定 URL 的 Reach 数的例子。