完成交易引擎


交易引擎由事件驱动,因此,通过订阅Kafka的Topic实现批量读消息,然后依次处理每个事件:

  1. for (AbstractEvent message : messages) {
  2. processEvent(message);
  3. }
  4. }
  5. void processEvent(AbstractEvent event) {
  6. if (event instanceof OrderRequestEvent) {
  7. createOrder((OrderRequestEvent) event);
  8. } else if (event instanceof OrderCancelEvent) {
  9. cancelOrder((OrderCancelEvent) event);
  10. } else if (event instanceof TransferEvent) {
  11. transfer((TransferEvent) event);
  12. }
  13. }

我们目前一共有3种类型的事件,处理都非常简单。以createOrder()为例,核心代码其实就几行:

  1. void createOrder(OrderRequestEvent event) {
  2. // 生成Order ID:
  3. long orderId = event.sequenceId * 10000 + (year * 100 + month);
  4. // 创建Order:
  5. OrderEntity order = orderService.createOrder(event.sequenceId, event.createdAt, orderId, event.userId, event.direction, event.price, event.quantity);
  6. if (order == null) {
  7. logger.warn("create order failed.");
  8. return;
  9. }
  10. // 撮合:
  11. MatchResult result = matchEngine.processOrder(event.sequenceId, order);
  12. // 清算:
  13. clearingService.clearMatchResult(result);
  14. }

核心的业务逻辑并不复杂,只是交易引擎在处理完订单后,仅仅改变自身状态是不够的,它还得向外输出具体的成交信息、订单状态等。因此,需要根据业务需求,在清算后继续收集撮合结果、已完成订单、准备发送的通知等,通过消息系统或Redis向外输出交易信息。如果把这些功能放到同一个线程内同步完成是非常耗时的,更好的方法是把它们先存储起来,再异步处理。例如,对于已完成的订单,可以异步落库:

  1. Queue<List<OrderEntity>> orderQueue = new ConcurrentLinkedQueue<>();
  2. void createOrder(OrderRequestEvent event) {
  3. ...
  4. if (!result.matchDetails.isEmpty()) {
  5. List<OrderEntity> closedOrders = new ArrayList<>();
  6. if (result.takerOrder.status.isFinalStatus) {
  7. }
  8. for (MatchDetailRecord detail : result.matchDetails) {
  9. OrderEntity maker = detail.makerOrder();
  10. if (maker.status.isFinalStatus) {
  11. closedOrders.add(maker);
  12. }
  13. }
  14. this.orderQueue.add(closedOrders);
  15. }
  16. }
  17. // 启动一个线程将orderQueue的Order异步写入数据库:
  18. void saveOrders() {
  19. // TODO:
  20. }

类似的,输出OrderBook、通知用户成交等信息都是异步处理。

接下来,我们再继续完善processEvent(),处理单个事件时,在处理具体的业务逻辑之前,我们首先根据sequenceId判断是否是重复消息,是重复消息就丢弃:

紧接着,我们判断是否丢失了消息,如果丢失了消息,就根据上次处理的消息的sequenceId,从数据库里捞出后续消息,直到赶上当前消息的sequenceId为止:

  1. // 判断是否丢失了消息:
  2. if (event.previousId > this.lastSequenceId) {
  3. // 从数据库读取丢失的消息:
  4. List<AbstractEvent> events = storeService.loadEventsFromDb(this.lastSequenceId);
  5. if (events.isEmpty()) {
  6. // 读取失败:
  7. System.exit(1);
  8. return;
  9. }
  10. // 处理丢失的消息:
  11. for (AbstractEvent e : events) {
  12. this.processEvent(e);
  13. }
  14. return;
  15. }
  16. // 判断当前消息是否指向上一条消息:
  17. if (event.previousId != lastSequenceId) {
  18. System.exit(1);
  19. // 正常处理:
  20. ...
  21. // 更新lastSequenceId:
  22. this.lastSequenceId = event.sequenceId;

这样一来,我们对消息系统的依赖就不是要求它100%可靠,遇到重复消息、丢失消息,交易引擎都可以从这些错误中自己恢复。

由于资产、订单、撮合、清算都在内存中完成,如何保证交易引擎每处理一个事件,它的内部状态都是正确的呢?我们可以为交易引擎增加一个自验证功能,在debug模式下,每处理一个事件,就自动验证内部状态的完整性,包括:

  • 验证资产系统总额为0,且除负债账户外其余账户资产不为负;
  • 验证订单系统未成交订单所冻结的资产与资产系统中的冻结一致;
  • 验证订单系统的订单与撮合引擎的订单簿一对一存在。
  1. void processEvent(AbstractEvent event) {
  2. ...
  3. if (debugMode) {
  4. this.validate();
  5. }
  6. }

这样我们就能快速在开发阶段尽可能早地发现问题。

交易引擎的测试也相对比较简单。对于同一组输入,每次运行都会得到相同的结果,所以我们可以构造几组确定的输入来验证交易引擎:

  1. class TradingEngineServiceTest {
  2. @Test
  3. public void testTradingEngine() {
  4. // TODO:
  5. }
  6. }

下面是问题解答。

交易引擎如果运行时崩溃,可以重启,重启后先把现有的所有交易事件重头开始执行一遍,即可得到最新的状态。

注意到重头开始执行交易事件,会导致重复发出市场成交、用户订单通知等事件,因此,可根据时间做判断,不再重复发通知。下游系统在处理通知事件时,也要根据通知携带的sequenceId做去重判断。

有的童鞋会问,如果现有的交易事件已经有几千万甚至几十亿,从头开始执行如果需要花费几个小时甚至几天,怎么办?

可以定期把交易引擎的状态序列化至文件系统,例如,每10分钟一次。当交易引擎崩溃时,读取最新的状态文件,即可恢复至约10分钟前的状态,后续追赶只需要执行很少的事件消息。

交易引擎的状态包括:

  • 资产系统的状态:即所有用户的资产列表;
  • 订单系统的状态:即所有活动订单列表;
  • 撮合引擎的状态:即买卖盘和最新市场价;
  • 最后一次处理的sequenceId。

序列化时,分别针对每个子系统进行序列化。对资产系统来说,每个用户的资产可序列化为用户ID: [USD可用, USD冻结, BTC可用, BTC冻结]的JSON格式,整个资产系统序列化后结构如下:

订单系统可序列化为一系列活动订单列表:

  1. [
  2. { "id": 10012207, "sequenceId": 1001, "price": 20901, ...},
  3. { "id": 10022207, "sequenceId": 1002, "price": 20902, ...},
  4. ]

撮合引擎可序列化为买卖盘列表(仅包含订单ID):

  1. {
  2. "BUY": [10012207, 10022207, ...],
  3. "SELL": [...],
  4. "marketPrice": 20901
  5. }

最后合并为一个交易引擎的状态文件:

  1. {
  2. "sequenceId": 189000,
  3. "assets": { ... },
  4. "orders": [ ... ],
  5. "match": { ... }
  6. }

交易引擎启动时,读取状态文件,然后依次恢复资产系统、订单系统和撮合引擎的状态,就得到了指定sequenceId的状态。

写入状态时,如果是异步写入,需要先复制状态、再写入,防止多线程读同一实例导致状态不一致。读写JSON时,要使用JSON库的流式API(例如Jackson的Streaming API),以免内存溢出。对BigDecimal进行序列化时,要注意不要误读为double类型以免丢失精度。

可以从GitHub或下载源码。

GitHub ▸ ▸ warpexchange

▸ build)

)

▤ schema.sql)

)

▤ pom.xml)

)

▸ src/main)

)

▸ bean)

)

▤ OrderBookItemBean.java)

)

▤ AccessibleProperty.java)

)

▤ CriteriaQuery.java)

)

▤ From.java)

)

▤ Mapper.java)

)

▤ Select.java)

)

▸ enums)

)

▤ Direction.java)

)

▤ OrderStatus.java)

)

▸ message)

)

▤ AbstractEvent.java)

)

▤ OrderRequestEvent.java)

)

▤ AbstractMessage.java)

)

▤ NotificationMessage.java)

)

▸ messaging)

)

▤ MessageProducer.java)

)

▤ Messaging.java)

)

▤ MessagingFactory.java)

)

▸ quotation)

)

▸ support)

)

▸ trade)

)

▤ MatchDetailEntity.java)

)

▸ redis)

)

▤ RedisConfiguration.java)

)

▤ SyncCommandCallback.java)

)

▤ LoggerSupport.java)

)

▤ ByteUtil.java)

)

▤ IpUtil.java)

)

▤ ApiError.java)

)

▤ ApiException.java)

)

▸ redis)

)

▤ logback-spring.xml)

)

▸ config)

)

▸ java/com/itranswarp/exchange/config)

)

▸ resources)

)

▤ pom.xml)

)

▤ application-default.yml)

)

▤ application.yml)

)

▤ quotation.yml)

)

▤ trading-engine.yml)

)

▤ ui-default.yml)

)

▸ parent)

)

▸ push)

)

▸ java/com/itranswarp/exchange/push)

)

▸ resources)

)

▤ pom.xml)

)

▸ java/com/itranswarp/exchange)

)

▸ resources)

)

▤ pom.xml)

)

▸ src/main)

)

▤ TradingApiApplication.java)

)

▤ application.yml)

)

▸ trading-engine)

)

▸ main)

)

▸ assets)

)

▤ AssetService.java)

)

▸ clearing)

)

▸ match)

)

▤ MatchEngine.java)

)

▤ OrderBook.java)

)

▸ order)

)

▸ store)

)

▸ web/api)

)

▤ TradingEngineApplication.java)

)

▸ resources)

)

▸ test/java/com/itranswarp/exchange)

)

▤ AssetServiceTest.java)

)

▤ MatchEngineTest.java)

)

▤ pom.xml)

)

▸ src/main)

)

▤ TradingSequencerApplication.java)

)

▤ application.yml)

)

▸ ui)

)

▸ java/com/itranswarp/exchange)

)

▸ resources)

)

▤ pom.xml)

)

▤ LICENSE)

)

交易引擎是以事件驱动的状态机模型,同样的输入将得到同样的输出。为提高交易系统的健壮性,可以自动检测重复消息和消息丢失并自动恢复。