阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

完成交易引擎

66次阅读
没有评论

共计 4199 个字符,预计需要花费 11 分钟才能阅读完成。

我们现在实现了资产模块、订单模块、撮合引擎和清算模块,现在,就可以把它们组合起来,实现一个完整的交易引擎:

public class TradingEngineService {@Autowired
    AssetService assetService;

    @Autowired
    OrderService orderService;

    @Autowired
    MatchEngine matchEngine;

    @Autowired
    ClearingService clearingService;
}

交易引擎由事件驱动,因此,通过订阅 Kafka 的 Topic 实现批量读消息,然后依次处理每个事件:

void processMessages(List<AbstractEvent> messages) {for (AbstractEvent message : messages) {processEvent(message);
    }
}

void processEvent(AbstractEvent event) {if (event instanceof OrderRequestEvent) {createOrder((OrderRequestEvent) event);
    } else if (event instanceof OrderCancelEvent) {cancelOrder((OrderCancelEvent) event);
    } else if (event instanceof TransferEvent) {transfer((TransferEvent) event);
    }
}

我们目前一共有 3 种类型的事件,处理都非常简单。以 createOrder() 为例,核心代码其实就几行:

void createOrder(OrderRequestEvent event) {// 生成 Order ID:
    long orderId = event.sequenceId * 10000 + (year * 100 + month);
    // 创建 Order:
    OrderEntity order = orderService.createOrder(event.sequenceId, event.createdAt, orderId, event.userId, event.direction, event.price, event.quantity);
    if (order == null) {logger.warn("create order failed.");
        return;
    }
    // 撮合:
    MatchResult result = matchEngine.processOrder(event.sequenceId, order);
    // 清算:
    clearingService.clearMatchResult(result);
}

核心的业务逻辑并不复杂,只是交易引擎在处理完订单后,仅仅改变自身状态是不够的,它还得向外输出具体的成交信息、订单状态等。因此,需要根据业务需求,在清算后继续收集撮合结果、已完成订单、准备发送的通知等,通过消息系统或 Redis 向外输出交易信息。如果把这些功能放到同一个线程内同步完成是非常耗时的,更好的方法是把它们先存储起来,再异步处理。例如,对于已完成的订单,可以异步落库:

Queue<List<OrderEntity>> orderQueue = new ConcurrentLinkedQueue<>();

void createOrder(OrderRequestEvent event) {
    ...
    // 清算完成后, 收集已完成 Order:
    if (!result.matchDetails.isEmpty()) {List<OrderEntity> closedOrders = new ArrayList<>();
        if (result.takerOrder.status.isFinalStatus) {closedOrders.add(result.takerOrder);
        }
        for (MatchDetailRecord detail : result.matchDetails) {OrderEntity maker = detail.makerOrder();
            if (maker.status.isFinalStatus) {closedOrders.add(maker);
            }
        }
        this.orderQueue.add(closedOrders);
    }
}

// 启动一个线程将 orderQueue 的 Order 异步写入数据库:
void saveOrders() {// TODO:
}

类似的,输出 OrderBook、通知用户成交等信息都是异步处理。

接下来,我们再继续完善 processEvent(),处理单个事件时,在处理具体的业务逻辑之前,我们首先根据sequenceId 判断是否是重复消息,是重复消息就丢弃:

void processEvent(AbstractEvent event) {if (event.sequenceId <= this.lastSequenceId) {logger.warn("skip duplicate event: {}", event);
        return;
    }
    // TODO:
}

紧接着,我们判断是否丢失了消息,如果丢失了消息,就根据上次处理的消息的 sequenceId,从数据库里捞出后续消息,直到赶上当前消息的sequenceId 为止:

// 判断是否丢失了消息:
if (event.previousId > this.lastSequenceId) {// 从数据库读取丢失的消息:
    List<AbstractEvent> events = storeService.loadEventsFromDb(this.lastSequenceId);
    if (events.isEmpty()) {// 读取失败:
        System.exit(1);
        return;
    }
    // 处理丢失的消息:
    for (AbstractEvent e : events) {this.processEvent(e);
    }
    return;
}
// 判断当前消息是否指向上一条消息:
if (event.previousId != lastSequenceId) {System.exit(1);
    return;
}
// 正常处理:
...
// 更新 lastSequenceId:
this.lastSequenceId = event.sequenceId;

这样一来,我们对消息系统的依赖就不是要求它 100% 可靠,遇到重复消息、丢失消息,交易引擎都可以从这些错误中自己恢复。

由于资产、订单、撮合、清算都在内存中完成,如何保证交易引擎每处理一个事件,它的内部状态都是正确的呢?我们可以为交易引擎增加一个自验证功能,在 debug 模式下,每处理一个事件,就自动验证内部状态的完整性,包括:

  • 验证资产系统总额为 0,且除负债账户外其余账户资产不为负;
  • 验证订单系统未成交订单所冻结的资产与资产系统中的冻结一致;
  • 验证订单系统的订单与撮合引擎的订单簿一对一存在。
void processEvent(AbstractEvent event) {
    ...
    if (debugMode) {this.validate();}
}

这样我们就能快速在开发阶段尽可能早地发现问题。

交易引擎的测试也相对比较简单。对于同一组输入,每次运行都会得到相同的结果,所以我们可以构造几组确定的输入来验证交易引擎:

class TradingEngineServiceTest {@Test
    public void testTradingEngine() {// TODO:
    }
}

下面是问题解答。

交易引擎崩溃后如何恢复?

交易引擎如果运行时崩溃,可以重启,重启后先把现有的所有交易事件重头开始执行一遍,即可得到最新的状态。

注意到重头开始执行交易事件,会导致重复发出市场成交、用户订单通知等事件,因此,可根据时间做判断,不再重复发通知。下游系统在处理通知事件时,也要根据通知携带的 sequenceId 做去重判断。

有的童鞋会问,如果现有的交易事件已经有几千万甚至几十亿,从头开始执行如果需要花费几个小时甚至几天,怎么办?

可以定期把交易引擎的状态序列化至文件系统,例如,每 10 分钟一次。当交易引擎崩溃时,读取最新的状态文件,即可恢复至约 10 分钟前的状态,后续追赶只需要执行很少的事件消息。

如何序列化交易引擎的状态?

交易引擎的状态包括:

  • 资产系统的状态:即所有用户的资产列表;
  • 订单系统的状态:即所有活动订单列表;
  • 撮合引擎的状态:即买卖盘和最新市场价;
  • 最后一次处理的 sequenceId。

序列化时,分别针对每个子系统进行序列化。对资产系统来说,每个用户的资产可序列化为 用户 ID: [USD 可用, USD 冻结, BTC 可用, BTC 冻结]的 JSON 格式,整个资产系统序列化后结构如下:

{
    "1": [-123000, 0, -12.3, 0],
    "100": [60000, 20000, 9, 0],
    "200": [43000, 0, 3, 0.3]
}

订单系统可序列化为一系列活动订单列表:

[
    { "id": 10012207, "sequenceId": 1001, "price": 20901, ...},
    { "id": 10022207, "sequenceId": 1002, "price": 20902, ...},
]

撮合引擎可序列化为买卖盘列表(仅包含订单 ID):

{
    "BUY": [10012207, 10022207, ...],
    "SELL": [...],
    "marketPrice": 20901
}

最后合并为一个交易引擎的状态文件:

{
    "sequenceId": 189000,
    "assets": { ... },
    "orders": [ ... ],
    "match": { ... }
}

交易引擎启动时,读取状态文件,然后依次恢复资产系统、订单系统和撮合引擎的状态,就得到了指定 sequenceId 的状态。

写入状态时,如果是异步写入,需要先复制状态、再写入,防止多线程读同一实例导致状态不一致。读写 JSON 时,要使用 JSON 库的流式 API(例如 Jackson 的 Streaming API),以免内存溢出。对 BigDecimal 进行序列化时,要注意不要误读为 double 类型以免丢失精度。

参考源码

可以从 GitHub 或 Gitee 下载源码。

GitHub

小结

交易引擎是以事件驱动的状态机模型,同样的输入将得到同样的输出。为提高交易系统的健壮性,可以自动检测重复消息和消息丢失并自动恢复。

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-08-05发表,共计4199字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中