消息传递框架与完整的ATM示例

    1回到第4章,我举了一个使用消息传递框架在线程间发送信息的例子。这里就会使用这个实现来完成ATM功能。下面完整代码就是功能的实现,包括消息传递框架。

    清单C.1实现了一个消息队列。其可以将消息以指针(指向基类)的方式存储在列表中;指定消息类型会由基类派生模板进行处理。推送包装类的构造实例,以及存储指向这个实例的指针;弹出实例的时候,将会返回指向其的指针。因为message_base类没有任何成员函数,在访问存储消息之前,弹出线程就需要将指针转为指针。

    清单C.1 简单的消息队列

    发送通过sender类(见清单C.2)实例处理过的消息。只能对已推送到队列中的消息进行包装。对sender实例的拷贝,只是拷贝了指向队列的指针,而非队列本身。

    清单C.2 sender类

    1. namespace messaging
    2. {
    3. class sender
    4. {
    5. queue*q; // sender是一个队列指针的包装类
    6. public:
    7. sender(): // sender无队列(默认构造函数)
    8. q(nullptr)
    9. {}
    10. explicit sender(queue*q_): // 从指向队列的指针进行构造
    11. q(q_)
    12. {}
    13. template<typename Message>
    14. void send(Message const& msg)
    15. {
    16. if(q)
    17. {
    18. q->push(msg); // 将发送信息推送给队列
    19. }
    20. }
    21. };
    22. }

    接收信息部分有些麻烦。不仅要等待队列中的消息,还要检查消息类型是否与所等待的消息类型匹配,并调用处理函数进行处理。那么就从receiver类的实现开始吧。

    1. namespace messaging
    2. {
    3. class receiver
    4. {
    5. queue q; // 接受者拥有对应队列
    6. public:
    7. operator sender() // 允许将类中队列隐式转化为一个sender队列
    8. {
    9. return sender(&q);
    10. }
    11. dispatcher wait() // 等待对队列进行调度
    12. {
    13. return dispatcher(&q);
    14. }
    15. };
    16. }

    sender只是引用一个消息队列,而receiver是拥有一个队列。可以使用隐式转换的方式获取sender引用的类。难点在于wait()中的调度。这里创建了一个dispatcher对象引用receiver中的队列。dispatcher类实现会在下一个清单中看到;如你所见,任务是在析构函数中完成的。在这个例子中,所要做的工作是对消息进行等待,以及对其进行调度。

    清单C.4 dispatcher类

    从wait()返回的dispatcher实例将马上被销毁,因为是临时变量,也向前文提到的,析构函数在这里做真正的工作。析构函数调用wait_and_dispatch()函数,这个函数中有一个循环①,等待消息的传入(这样才能进行弹出操作),然后将消息传递给dispatch()函数。dispatch()函数本身②很简单;会检查小时是否是一个close_queue消息,当是close_queue消息时,抛出一个异常;如果不是,函数将会返回false来表明消息没有被处理。因为会抛出close_queue异常,所以析构函数会标示为noexcept(false);在没有任何标识的情况下,一般情况下析构函数都noexcept(true)④型,这表示没有任何异常抛出,并且close_queue异常将会使程序终止。

    虽然,不会经常的去调用wait()函数,不过,在大多数时间里,你都希望对一条消息进行处理。这时就需要handle()成员函数③的加入。这个函数是一个模板,并且消息类型不可推断,所以你需要指定需要处理的消息类型,并且传入函数(或可调用对象)进行处理,并将队列传入当前dispatcher对象的handle()函数。这将在清单C.5中展示。这就是为什么,在测试析构函数中的chained值前,要等待消息耳朵原因;不仅是避免“移动”类型的对象对消息进行等待,而且允许将等待状态转移到新的TemplateDispatcher实例中。

    清单C.5 TemplateDispatcher类模板

    1. namespace messaging
    2. {
    3. template<typename PreviousDispatcher,typename Msg,typename Func>
    4. class TemplateDispatcher
    5. {
    6. queue* q;
    7. PreviousDispatcher* prev;
    8. Func f;
    9. bool chained;
    10. TemplateDispatcher(TemplateDispatcher const&)=delete;
    11. TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;
    12. template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
    13. friend class TemplateDispatcher; // 所有特化的TemplateDispatcher类型实例都是友元类
    14. void wait_and_dispatch()
    15. {
    16. for(;;)
    17. {
    18. auto msg=q->wait_and_pop();
    19. if(dispatch(msg)) // 1 如果消息处理过后,会跳出循环
    20. break;
    21. }
    22. }
    23. bool dispatch(std::shared_ptr<message_base> const& msg)
    24. {
    25. if(wrapped_message<Msg>* wrapper=
    26. dynamic_cast<wrapped_message<Msg>*>(msg.get())) // 2 检查消息类型,并且调用函数
    27. {
    28. f(wrapper->contents);
    29. return true;
    30. }
    31. else
    32. {
    33. return prev->dispatch(msg); // 3 链接到之前的调度器上
    34. }
    35. }
    36. public:
    37. TemplateDispatcher(TemplateDispatcher&& other):
    38. q(other.q),prev(other.prev),f(std::move(other.f)),
    39. chained(other.chained)
    40. {
    41. other.chained=true;
    42. }
    43. TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
    44. q(q_),prev(prev_),f(std::forward<Func>(f_)),chained(false)
    45. {
    46. prev_->chained=true;
    47. }
    48. template<typename OtherMsg,typename OtherFunc>
    49. TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>
    50. handle(OtherFunc&& of) // 4 可以链接其他处理器
    51. {
    52. return TemplateDispatcher<
    53. TemplateDispatcher,OtherMsg,OtherFunc>(
    54. q,this,std::forward<OtherFunc>(of));
    55. }
    56. ~TemplateDispatcher() noexcept(false) // 5 这个析构函数也是noexcept(false)的
    57. {
    58. if(!chained)
    59. {
    60. wait_and_dispatch();
    61. }
    62. }
    63. };
    64. }

    TemplateDispatcher<>类模板仿照了dispatcher类,二者几乎相同。特别是在析构函数上,都是调用wait_and_dispatch()等待处理消息。

    这种简单的架构允许你想队列推送任何类型的消息,并且调度器有选择的与接收端的消息进行匹配。同样,也允许为了推送消息,将消息队列的引用进行传递的同时,保持接收端的私有性。

    为了完成第4章的例子,消息的组成将在清单C.6中给出,各种状态机将在清单C.7,C.8和C.9中给出。最后,驱动代码将在C.10给出。

    清单C.6 ATM消息

    1. struct withdraw
    2. {
    3. std::string account;
    4. unsigned amount;
    5. mutable messaging::sender atm_queue;
    6. withdraw(std::string const& account_,
    7. unsigned amount_,
    8. messaging::sender atm_queue_):
    9. account(account_),amount(amount_),
    10. atm_queue(atm_queue_)
    11. {}
    12. };
    13. struct withdraw_ok
    14. {};
    15. struct withdraw_denied
    16. {};
    17. struct cancel_withdrawal
    18. {
    19. std::string account;
    20. unsigned amount;
    21. cancel_withdrawal(std::string const& account_,
    22. unsigned amount_):
    23. account(account_),amount(amount_)
    24. {}
    25. };
    26. struct withdrawal_processed
    27. std::string account;
    28. unsigned amount;
    29. unsigned amount_):
    30. account(account_),amount(amount_)
    31. {}
    32. };
    33. struct card_inserted
    34. {
    35. std::string account;
    36. explicit card_inserted(std::string const& account_):
    37. account(account_)
    38. {}
    39. };
    40. struct digit_pressed
    41. {
    42. char digit;
    43. explicit digit_pressed(char digit_):
    44. digit(digit_)
    45. {}
    46. };
    47. struct clear_last_pressed
    48. {};
    49. struct eject_card
    50. {};
    51. struct withdraw_pressed
    52. {
    53. unsigned amount;
    54. explicit withdraw_pressed(unsigned amount_):
    55. amount(amount_)
    56. {}
    57. };
    58. struct cancel_pressed
    59. {};
    60. struct issue_money
    61. {
    62. unsigned amount;
    63. issue_money(unsigned amount_):
    64. amount(amount_)
    65. {}
    66. };
    67. struct verify_pin
    68. {
    69. std::string account;
    70. std::string pin;
    71. mutable messaging::sender atm_queue;
    72. verify_pin(std::string const& account_,std::string const& pin_,
    73. messaging::sender atm_queue_):
    74. account(account_),pin(pin_),atm_queue(atm_queue_)
    75. {}
    76. };
    77. struct pin_verified
    78. {};
    79. struct pin_incorrect
    80. {};
    81. struct display_enter_pin
    82. {};
    83. struct display_enter_card
    84. {};
    85. struct display_insufficient_funds
    86. {};
    87. struct display_withdrawal_cancelled
    88. {};
    89. struct display_pin_incorrect_message
    90. {};
    91. struct display_withdrawal_options
    92. {};
    93. struct get_balance
    94. {
    95. std::string account;
    96. mutable messaging::sender atm_queue;
    97. get_balance(std::string const& account_,messaging::sender atm_queue_):
    98. account(account_),atm_queue(atm_queue_)
    99. {}
    100. };
    101. struct balance
    102. {
    103. unsigned amount;
    104. explicit balance(unsigned amount_):
    105. amount(amount_)
    106. {}
    107. };
    108. struct display_balance
    109. {
    110. unsigned amount;
    111. explicit display_balance(unsigned amount_):
    112. amount(amount_)
    113. {}
    114. };
    115. struct balance_pressed
    116. {};

    清单C.7 ATM状态机

    清单C.8 银行状态机

    1. class bank_machine
    2. {
    3. messaging::receiver incoming;
    4. unsigned balance;
    5. public:
    6. bank_machine():
    7. balance(199)
    8. {}
    9. void done()
    10. {
    11. get_sender().send(messaging::close_queue());
    12. }
    13. void run()
    14. {
    15. try
    16. {
    17. for(;;)
    18. {
    19. incoming.wait()
    20. .handle<verify_pin>(
    21. [&](verify_pin const& msg)
    22. {
    23. if(msg.pin=="1937")
    24. {
    25. msg.atm_queue.send(pin_verified());
    26. }
    27. else
    28. {
    29. msg.atm_queue.send(pin_incorrect());
    30. }
    31. [&](withdraw const& msg)
    32. {
    33. if(balance>=msg.amount)
    34. {
    35. msg.atm_queue.send(withdraw_ok());
    36. balance-=msg.amount;
    37. }
    38. else
    39. {
    40. msg.atm_queue.send(withdraw_denied());
    41. }
    42. })
    43. .handle<get_balance>(
    44. [&](get_balance const& msg)
    45. {
    46. msg.atm_queue.send(::balance(balance));
    47. })
    48. .handle<withdrawal_processed>(
    49. [&](withdrawal_processed const& msg)
    50. {
    51. })
    52. .handle<cancel_withdrawal>(
    53. [&](cancel_withdrawal const& msg)
    54. {
    55. });
    56. }
    57. }
    58. catch(messaging::close_queue const&)
    59. {
    60. }
    61. }
    62. messaging::sender get_sender()
    63. {
    64. return incoming;
    65. }
    66. };

    清单C.9 用户状态机

    1. class interface_machine
    2. {
    3. messaging::receiver incoming;
    4. public:
    5. void done()
    6. {
    7. get_sender().send(messaging::close_queue());
    8. }
    9. void run()
    10. {
    11. try
    12. {
    13. for(;;)
    14. {
    15. incoming.wait()
    16. .handle<issue_money>(
    17. [&](issue_money const& msg)
    18. {
    19. {
    20. std::lock_guard<std::mutex> lk(iom);
    21. std::cout<<"Issuing "
    22. <<msg.amount<<std::endl;
    23. }
    24. })
    25. .handle<display_insufficient_funds>(
    26. [&](display_insufficient_funds const& msg)
    27. {
    28. {
    29. std::lock_guard<std::mutex> lk(iom);
    30. std::cout<<"Insufficient funds"<<std::endl;
    31. }
    32. })
    33. .handle<display_enter_pin>(
    34. [&](display_enter_pin const& msg)
    35. {
    36. {
    37. std::lock_guard<std::mutex> lk(iom);
    38. std::cout<<"Please enter your PIN (0-9)"<<std::endl;
    39. }
    40. })
    41. .handle<display_enter_card>(
    42. [&](display_enter_card const& msg)
    43. {
    44. {
    45. std::lock_guard<std::mutex> lk(iom);
    46. std::cout<<"Please enter your card (I)"
    47. <<std::endl;
    48. }
    49. })
    50. .handle<display_balance>(
    51. [&](display_balance const& msg)
    52. {
    53. {
    54. std::lock_guard<std::mutex> lk(iom);
    55. std::cout
    56. <<"The balance of your account is "
    57. <<msg.amount<<std::endl;
    58. }
    59. })
    60. .handle<display_withdrawal_options>(
    61. [&](display_withdrawal_options const& msg)
    62. {
    63. {
    64. std::lock_guard<std::mutex> lk(iom);
    65. std::cout<<"Withdraw 50? (w)"<<std::endl;
    66. std::cout<<"Display Balance? (b)"
    67. <<std::endl;
    68. std::cout<<"Cancel? (c)"<<std::endl;
    69. }
    70. })
    71. .handle<display_withdrawal_cancelled>(
    72. [&](display_withdrawal_cancelled const& msg)
    73. {
    74. {
    75. std::lock_guard<std::mutex> lk(iom);
    76. std::cout<<"Withdrawal cancelled"
    77. <<std::endl;
    78. }
    79. })
    80. .handle<display_pin_incorrect_message>(
    81. [&](display_pin_incorrect_message const& msg)
    82. {
    83. {
    84. std::lock_guard<std::mutex> lk(iom);
    85. std::cout<<"PIN incorrect"<<std::endl;
    86. }
    87. })
    88. .handle<eject_card>(
    89. [&](eject_card const& msg)
    90. {
    91. {
    92. std::lock_guard<std::mutex> lk(iom);
    93. std::cout<<"Ejecting card"<<std::endl;
    94. }
    95. });
    96. }
    97. }
    98. catch(messaging::close_queue&)
    99. {
    100. }
    101. }
    102. messaging::sender get_sender()
    103. {
    104. return incoming;
    105. }