共计 5504 个字符,预计需要花费 14 分钟才能阅读完成。
有了交易引擎和定序系统,我们还需要一个 API 系统,用于接收所有交易员的订单请求。
相比事件驱动的交易引擎,API 系统就比较简单,因为它就是一个标准的 Web 应用。
在编写 API 之前,我们需要对请求进行认证,即识别出是哪个用户发出的请求。用户认证放在 Filter 中是最合适的。认证方式可以是简单粗暴的用户名 + 口令,也可以是 Token,也可以是 API Key+API Secret 等模式。
我们先实现一个最简单的用户名 + 口令的认证方式。需要注意的是,API 和 Web 页面不同,Web 页面可以给用户一个登录页,登录成功后设置 Session 或 Cookie,后续请求检查的是 Session 或 Cookie。API 不能使用 Session,因为 Session 很难做无状态集群,API 也不建议使用 Cookie,因为 API 域名很可能与 Web UI 的域名不一致,拿不到 Cookie。要在 API 中使用用户名 + 口令的认证方式,可以用标准的 HTTP 头 Authorization 的 Basic
模式:
Authorization: Basic 用户名: 口令
因此,我们可以尝试从 Authorization
中获取用户名和口令来认证:
Long parseUserFromAuthorization(String auth) {if (auth.startsWith("Basic")) {// 用 Base64 解码:
String eap = new String(Base64.getDecoder().decode(auth.substring(6)));
// 分离 email:password
int pos = eap.indexOf(':');
String email = eap.substring(0, pos);
String passwd = eap.substring(pos + 1);
// 验证:
UserProfileEntity p = userService.signin(email, passwd);
return p.userId;
}
throw new ApiException(ApiError.AUTH_SIGNIN_FAILED, "Invalid Authorization header.");
}
在 ApiFilter
中完成认证后,使用 UserContext
传递用户 ID:
public class ApiFilter {@Override
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain)
throws IOException, ServletException {// 尝试认证用户:
String authHeader = req.getHeader("Authorization");
Long userId = authHeader == null ? null : parseUserFromAuthorization(authHeader);
if (userId == null) {// 匿名身份:
chain.doFilter(req, resp);
} else {// 用户身份:
try (UserContext ctx = new UserContext(userId)) {chain.doFilter(req, resp);
}
}
}
}
Basic 模式很简单,需要注意的是 用户名: 口令
使用 :
分隔,然后整个串用 Base64 编码,因此,读取的时候需要先用 Base64 解码。
虽然 Basic 模式并不安全,但是有了一种基本的认证模式,我们就可以把 API- 定序 - 交易串起来了。后续我们再继续添加其他认证模式。
编写 API Controller
对于认证用户的操作,例如,查询资产余额,可通过 UserContext
获取当前用户,然后通过交易引擎查询并返回用户资产余额:
@ResponseBody
@GetMapping(value = "/assets", produces = "application/json")
public String getAssets() throws IOException {Long userId = UserContext.getRequiredUserId();
return tradingEngineApiProxyService.get("/internal/" + userId + "/assets");
}
因为交易引擎返回的结果就是 JSON 字符串,没必要先反序列化再序列化,可以以 String
的方式直接返回给客户端,需要标注 @ResponseBody
表示不要对 String
再进行序列化处理。
对于无需认证的操作,例如,查询公开市场的订单簿,可以直接返回 Redis 缓存结果:
@ResponseBody
@GetMapping(value = "/orderBook", produces = "application/json")
public String getOrderBook() {String data = redisService.get(RedisCache.Key.ORDER_BOOK);
return data == null ? OrderBookBean.EMPTY : data;
}
但是对于创建订单的请求,处理就麻烦一些,因为 API 收到请求后,仅仅通过消息系统给定序系统发了一条消息。消息系统本身并不是类似 HTTP 的请求 - 响应模式,我们拿不到消息处理的结果。这里先借助 Spring 的异步响应模型 DeferredResult
,再借助 Redis 的 pub/sub 模型,当 API 发送消息时,使用全局唯一refId
跟踪消息,当交易引擎处理完订单请求后,向 Redis 发送 pub 事件,API 收到 Redis 推送的事件后,根据 refId
找到DeferredResult
,设置结果后由 Spring 异步返回给客户端:
┌─────────┐ ┌─────────┐
──▶│ API │◀────────────────│ Redis │
└─────────┘ └─────────┘
│ ▲
▼ │
┌─────────┐ │
│ MQ │ pub│
└─────────┘ │
│ │
▼ │
┌─────────┐ ┌─────────┐ ┌─────────┐
│Sequencer│──▶│ MQ │──▶│ Engine │
└─────────┘ └─────────┘ └─────────┘
代码实现如下:
public class TradingApiController {// 消息 refId -> DeferredResult:
Map<String, DeferredResult<ResponseEntity<String>>> deferredResultMap = new ConcurrentHashMap<>();
@Autowired
RedisService redisService;
@PostConstruct
public void init() {// 订阅 Redis:
this.redisService.subscribe(RedisCache.Topic.TRADING_API_RESULT, this::onApiResultMessage);
}
@PostMapping(value = "/orders", produces = "application/json")
@ResponseBody
public DeferredResult<ResponseEntity<String>> createOrder(@RequestBody OrderRequestBean orderRequest) {final Long userId = UserContext.getRequiredUserId();
// 消息的 Reference ID:
final String refId = IdUtil.generateUniqueId();
var event = new OrderRequestEvent();
event.refId = refId;
event.userId = userId;
event.direction = orderRequest.direction;
event.price = orderRequest.price;
event.quantity = orderRequest.quantity;
event.createdAt = System.currentTimeMillis();
// 如果超时则返回:
ResponseEntity<String> timeout = new ResponseEntity<>(getTimeoutJson(), HttpStatus.BAD_REQUEST);
// 正常异步返回:
DeferredResult<ResponseEntity<String>> deferred = new DeferredResult<>(500, timeout); // 0.5 秒超时
deferred.onTimeout(() -> {this.deferredResultMap.remove(event.refId);
});
// 根据 refId 跟踪消息处理结果:
this.deferredResultMap.put(event.refId, deferred);
// 发送消息:
sendMessage(event);
return deferred;
}
// 收到 Redis 的消息结果推送:
public void onApiResultMessage(String msg) {ApiResultMessage message = objectMapper.readValue(msg, ApiResultMessage.class);
if (message.refId != null) {// 根据消息 refId 查找 DeferredResult:
DeferredResult<ResponseEntity<String>> deferred = this.deferredResultMap.remove(message.refId);
if (deferred != null) {// 找到 DeferredResult 后设置响应结果:
ResponseEntity<String> resp = new ResponseEntity<>(JsonUtil.writeJson(message.result), HttpStatus.OK);
deferred.setResult(resp);
}
}
}
}
如何实现 API Key 认证
身份认证的本质是确认用户身份。用户身份其实并不包含密码,而是用户 ID、email、名字等信息,可以看作数据库中的 user_profiles
表:
userId | name | |
---|---|---|
100 | [email protected] | Bob |
101 | [email protected] | alice |
102 | [email protected] | Cook |
使用口令认证时,通过添加一个 password_auths
表,存储哈希后的口令,并关联至某个用户 ID,即可完成口令认证:
userId | random | passwd |
---|---|---|
100 | c47snXI | 7b6da12c… |
101 | djEqC2I | f7b68248… |
并不是每个用户都必须有口令,没有口令的用户仅仅表示该用户不能通过口令来认证身份,但完全可以通过其他方式认证。
使用 API Key 认证同理,通过添加一个 api_auths
表,存储 API Key、API Secret 并关联至某个用户 ID:
userId | apiKey | apiSecret |
---|---|---|
101 | 5b503947f4f5d34a | e57c677d4ab4c5a4 |
102 | 13a867e8da13c7f6 | 92e41573e833ae13 |
102 | 341a8e60baf5b824 | 302c9e195826267f |
用户使用 API Key 认证时,提供 API Key,以及用 API Secret 计算的 Hmac 哈希,服务器验证 Hmac 哈希后,就可以确认用户身份,因为其他人不知道该用户的 API Secret,无法计算出正确的 Hmac。
发送 API Key 认证时,可以定义如下的 HTTP 头:
API-Key: 5b503947f4f5d34a
API-Timestamp: 20220726T092137Z <- 防止重放攻击的时间戳
API-Signature: d7a567b6cab85bcd
计算签名的原始输入可以包括 HTTP Method、Path、Timestamp、Body 等关键信息,具体格式可参考 AWS API 签名方式。
一个用户可以关联多个 API Key 认证,还可以给每个 API Key 附加特定权限,例如只读权限,这样用 API Key 认证就更加安全。
内部系统调用 API 如何实现用户认证
很多时候,内部系统也需要调用 API,并且需要以特定用户的身份调用 API。让内部系统去读用户的口令或者 API Key 都是不合理的,更好的方式是使用一次性 Token,还是利用 Authorization 头的 Bearer 模式:
Authorization: Bearer 5NPtI6LW...
构造一次性 Token 可以用userId:expires:hmac
,内部系统和 API 共享同一个 Hmac Key,就可以正确计算并验证签名。外部用户因为无法获得 Hmac Key 而无法伪造 Token。
如何跟踪 API 性能
可以使用 Spring 提供的 HandlerInterceptor
和DeferredResultProcessingInterceptor
跟踪 API 性能,它们分别用于拦截同步 API 和异步 API。
参考源码
可以从 GitHub 或 Gitee 下载源码。
GitHub
小结
API 系统负责认证用户身份,并提供一个唯一的交易入口。