推送系统负责将公开市场的实时信息,包括订单簿、最新成交、最新 K 线等推送给客户端,对于用户的订单,还需要将成交信息推送给指定用户。FIX(Financial Information eXchange)协议是金融交易的一种实时化通讯协议,但是它非常复杂,而且不同版本的规范也不同。对于 Warp Exchange 来说,我们先实现一版简单的基于 WebSocket 推送 JSON 格式的通知。
和普通 Web 应用不同的是,基于 Servlet 的线程池模型不能高效地支持成百上千的 WebSocket 长连接。Java 提供了 NIO 能充分利用 Linux 系统的 epoll 机制高效支持大量的长连接,但是直接使用 NIO 的接口非常繁琐,通常我们会选择基于 NIO 的 Netty 服务器。直接使用 Netty 其实仍然比较繁琐,基于 Netty 开发我们可以选择:
- Spring WebFlux:封装了 Netty 并实现 Reactive 接口;
- Vert.x:封装了 Netty 并提供简单的 API 接口。
这里我们选择 Vert.x,因为它的 API 更简单。
Vert.x 本身包含若干模块,根据需要,我们引入 3 个组件:
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-core</artifactId> | |
<version>${vertx.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-web</artifactId> | |
<version>${vertx.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-redis-client</artifactId> | |
<version>${vertx.version}</version> | |
</dependency> |
package com.itranswarp.exchange.push; | |
// 禁用数据库自动配置 (无 DataSource, JdbcTemplate...) | |
public class PushApplication {public static void main(String[] args) {System.setProperty("vertx.disableFileCPResolving", "true"); | |
System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory"); | |
SpringApplication app = new SpringApplication(PushApplication.class); | |
// 禁用 Spring 的 Web: | |
app.setWebApplicationType(WebApplicationType.NONE); | |
app.run(args); | |
} | |
} |
上述代码仍然是一个标准的 Spring Boot 应用,因为我们希望利用 Spring Cloud Config 读取配置。由于我们不使用 Spring 自身的 Web 功能,因此需要禁用 Spring 的 Web 功能。推送服务本身并不需要访问数据库,因此禁用数据库自动配置。最后,我们把 PushApplication
放在 com.itranswarp.exchange.push
包下面,以避免自动扫描到 com.itranswarp.exchange
包下的组件(如 RedisService)。
,注意它是一个 Spring 组件,由 Spring 初始化:
public class PushService extends LoggerSupport { | |
private int serverPort; | |
String hmacKey; | |
private String redisHost; | |
private int redisPort; | |
private String redisPassword; | |
private int redisDatabase = 0; | |
private Vertx vertx; | |
public void startVertx() {// TODO: init Vert.x | |
} | |
} |
由 Spring 初始化该组件的目的是注入各种配置。在初始化方法中,我们就可以启动 Vert.x:
public void startVertx() {// 启动 Vert.x: | |
this.vertx = Vertx.vertx(); | |
// 创建一个 Vert.x Verticle 组件: | |
var push = new PushVerticle(this.hmacKey, this.serverPort); | |
vertx.deployVerticle(push); | |
// 连接到 Redis: | |
String url = "redis://" + (this.redisPassword.isEmpty() ? "" : ":" + this.redisPassword + "@") + this.redisHost | |
+ ":" + this.redisPort + "/" + this.redisDatabase; | |
Redis redis = Redis.createClient(vertx, url); | |
redis.connect().onSuccess(conn -> {// 事件处理: | |
conn.handler(response -> {// 收到 Redis 的 PUSH: | |
if (response.type() == ResponseType.PUSH) {int size = response.size(); | |
if (size == 3) {Response type = response.get(2); | |
if (type instanceof BulkType) {// 收到 PUBLISH 通知: | |
String msg = type.toString(); | |
// 由 push verticle 组件处理该通知: | |
push.broadcast(msg); | |
} | |
} | |
} | |
}); | |
// 订阅 Redis 的 Topic: | |
conn.send(Request.cmd(Command.SUBSCRIBE).arg(RedisCache.Topic.NOTIFICATION)).onSuccess(resp -> {logger.info("subscribe ok."); | |
}).onFailure(err -> {logger.error("subscribe failed.", err); | |
System.exit(1); | |
}); | |
}).onFailure(err -> {logger.error("connect to redis failed.", err); | |
System.exit(1); | |
}); | |
} |
Vert.x 用 Verticle
表示一个组件,我们编写 PushVerticle
来处理 WebSocket 连接:
public class PushVerticle extends AbstractVerticle { | |
public void start() {// 创建 VertX HttpServer: | |
HttpServer server = vertx.createHttpServer(); | |
// 创建路由: | |
Router router = Router.router(vertx); | |
// 处理请求 GET /notification: | |
router.get("/notification").handler(requestHandler -> {HttpServerRequest request = requestHandler.request(); | |
// 从 token 参数解析 userId: | |
Supplier<Long> supplier = () -> {String tokenStr = request.getParam("token"); | |
if (tokenStr != null && !tokenStr.isEmpty()) {AuthToken token = AuthToken.fromSecureString(tokenStr, this.hmacKey); | |
if (!token.isExpired()) {return token.userId();} | |
} | |
return null; | |
}; | |
final Long userId = supplier.get(); | |
logger.info("parse user id from token: {}", userId); | |
// 将连接升级到 WebSocket: | |
request.toWebSocket(ar -> {if (ar.succeeded()) {initWebSocket(ar.result(), userId); | |
} | |
}); | |
}); | |
// 处理请求 GET /actuator/health: | |
router.get("/actuator/health").respond(ctx -> ctx.response().putHeader("Content-Type", "application/json").end("{\"status\":\"UP\"}")); | |
// 其他请求返回 404 错误: | |
router.get().respond(ctx -> ctx.response().setStatusCode(404).setStatusMessage("No Route Found").end()); | |
// 绑定路由并监听端口: | |
server.requestHandler(router).listen(this.serverPort, result -> {if (result.succeeded()) {logger.info("Vertx started on port(s): {} (http) with context path''", this.serverPort); | |
} else {logger.error("Start http server failed on port" + this.serverPort, result.cause()); | |
vertx.close(); | |
System.exit(1); | |
} | |
}); | |
} | |
} |
在 PushVerticle
方法由 Vert.x 回调。我们在 start()
- 创建基于 Vert.x 的 HTTP 服务器(内部使用 Netty);
- 创建路由;
- 绑定一个路径为
的 GET 请求,将其升级为 WebSocket 连接; - 绑定其他路径的 GET 请求;
- 开始监听指定端口号。
在处理 /notification
时,我们尝试从 URL 的 token 参数解析出用户 ID,这样我们就无需访问数据库而获得了当前连接的用户。升级到 WebSocket 连接后,再调用 initWebSocket()
继续处理 WebSocket 连接:
public class PushVerticle extends AbstractVerticle {// 所有 Handler: | |
Map<String, Boolean> handlersSet = new ConcurrentHashMap<>(1000); | |
// 用户 ID -> Handlers | |
Map<Long, Set<String>> userToHandlersMap = new ConcurrentHashMap<>(1000); | |
// Handler -> 用户 ID | |
Map<String, Long> handlerToUserMap = new ConcurrentHashMap<>(1000); | |
void initWebSocket(ServerWebSocket websocket, Long userId) {// 获取一个 WebSocket 关联的 Handler ID: | |
String handlerId = websocket.textHandlerID(); | |
// 处理输入消息: | |
websocket.textMessageHandler(str -> {logger.info("text message:" + str); | |
}); | |
websocket.exceptionHandler(t -> {logger.error("websocket error:" + t.getMessage(), t); | |
}); | |
// 关闭连接时: | |
websocket.closeHandler(e -> {unsubscribeClient(handlerId); | |
unsubscribeUser(handlerId, userId); | |
}); | |
subscribeClient(handlerId); | |
subscribeUser(handlerId, userId); | |
} | |
void subscribeClient(String handlerId) {this.handlersSet.put(handlerId, Boolean.TRUE); | |
} | |
void unsubscribeClient(String handlerId) {this.handlersSet.remove(handlerId); | |
} | |
void subscribeUser(String handlerId, Long userId) {if (userId == null) {return; | |
} | |
handlerToUserMap.put(handlerId, userId); | |
Set<String> set = userToHandlersMap.get(userId); | |
if (set == null) {set = new HashSet<>(); | |
userToHandlersMap.put(userId, set); | |
} | |
set.add(handlerId); | |
} | |
void unsubscribeUser(String handlerId, Long userId) {if (userId == null) {return; | |
} | |
handlerToUserMap.remove(handlerId); | |
Set<String> set = userToHandlersMap.get(userId); | |
if (set != null) {set.remove(handlerId); | |
} | |
} | |
} |
在 Vert.x 中,每个 WebSocket 连接都有一个唯一的 Handler 标识,以 String
表示。我们用几个 Map
保存 Handler 和用户 ID 的映射关系,当关闭连接时,将对应的映射关系删除。
最后一个关键方法 broadcast()
中订阅的 Redis 推送时触发,该方法用于向用户主动推送通知:
public void broadcast(String text) {NotificationMessage message = JsonUtil.readJson(text, NotificationMessage.class); | |
if (message.userId == null) {// 没有用户 ID 时,推送给所有连接: | |
EventBus eb = vertx.eventBus(); | |
for (String handler : this.handlersSet.keySet()) {eb.send(handler, text); | |
} | |
} else {// 推送给指定用户: | |
Set<String> handlers = this.userToHandlersMap.get(message.userId); | |
if (handlers != null) {EventBus eb = vertx.eventBus(); | |
for (String handler : handlers) {eb.send(handler, text); | |
} | |
} | |
} | |
} |
当 Redis 收到 PUBLISH
调用后,它自动将 String
表示的 JSON 数据推送给所有订阅端。我们在 PushService
中订阅了 notification
这个 Topic,然后通过 broadcast()
推送给 WebSocket 客户端。对于一个NotificationMessage
,则推送给指定用户,适用于订单成交等针对用户 ID 的通知;如果没有设置userId
整个推送服务仅包括 3 个 Java 文件,我们就实现了基于 Redis 和 WebSocket 的高性能推送。
要高效处理大量 WebSocket 连接,我们选择基于 Netty 的 Vert.x 框架,可以通过少量代码配合 Redis 实现推送。