共计 8207 个字符,预计需要花费 21 分钟才能阅读完成。
行情系统用来生成公开市场的历史数据,主要是 K 线图。
K 线图的数据来源是交易引擎成交产生的一个个 Tick。一个 K 线包括 OHLC 这 4 个价格数据。在一个时间段内,第一个 Tick 的价格是 Open,最后一个 Tick 的价格是 Close,最高的价格是 High,最低的价格是 Low:
High ──▶│ | |
│ | |
┌─┴─┐◀── Close | |
│ │ | |
│ │ | |
Open ──▶└─┬─┘ | |
│ | |
│ | |
Low ──▶│ |
给定一组 Tick 集合,就可以汇总成一个 K 线,对应一个 Bar 结构:
public class AbstractBarEntity {public long startTime; // 开始时间 | |
public BigDecimal openPrice; // 开始价格 | |
public BigDecimal highPrice; // 最高价格 | |
public BigDecimal lowPrice; // 最低价格 | |
public BigDecimal closePrice; // 结束价格 | |
public BigDecimal quantity; // 成交数量 | |
} |
通常我们需要按 1 秒、1 分钟、1 小时和 1 天来生成不同类型的 K 线,因此,行情系统的功能就是不断从消息系统中读取 Tick,合并,然后输出不同类型的 K 线。
此外,API 系统还需要提供查询公开市场信息的功能。对于最近的成交信息和 K 线图,可以缓存在 Redis 中,对于较早时期的 K 线图,可以通过数据库查询。因此,行情系统需要将生成的 K 线保存到数据库中,同时负责不断更新 Redis 的缓存。
对于最新成交信息,我们在 Redis 中用一个 List 表示,它的每一个元素是一个序列号后的 JSON:
["{...}", "{...}", "{...}"...]
如果有新的 Tick 产生,就需要把它们追加到列表尾部,同时将最早的 Tick 删除,以便维护一个最近成交的列表。
直接读取 Redis 列表,操作后再写回 Redis 是可以的,但比较麻烦。这里我们直接用 Lua 脚本更新最新 Tick 列表。Redis 支持将一个 Lua 脚本加载后,直接在 Redis 内部执行脚本:
local KEY_LAST_SEQ = '_TickSeq_' -- 上次更新的 SequenceID | |
local LIST_RECENT_TICKS = KEYS[1] -- 最新 Ticks 的 Key | |
local seqId = ARGV[1] -- 输入的 SequenceID | |
local jsonData = ARGV[2] -- 输入的 JSON 字符串表示的 tick 数组:"["{...}","{...}",...]" | |
local strData = ARGV[3] -- 输入的 JSON 字符串表示的 tick 数组:"[{...},{...},...]" | |
-- 获取上次更新的 sequenceId: | |
local lastSeqId = redis.call('GET', KEY_LAST_SEQ) | |
local ticks, len; | |
if not lastSeqId or tonumber(seqId) > tonumber(lastSeqId) then | |
-- 广播: | |
redis.call('PUBLISH', 'notification', '{"type":"tick","sequenceId":' .. seqId .. ',"data":' .. jsonData .. '}') | |
-- 保存当前 sequence id: | |
redis.call('SET', KEY_LAST_SEQ, seqId) | |
-- 更新最新 tick 列表: | |
ticks = cjson.decode(strData) | |
len = redis.call('RPUSH', LIST_RECENT_TICKS, unpack(ticks)) | |
if len > 100 then | |
-- 裁剪 LIST 以保存最新的 100 个 Tick: | |
redis.call('LTRIM', LIST_RECENT_TICKS, len-100, len-1) | |
end | |
return true | |
end | |
-- 无更新返回 false | |
return false |
在 API 中,要获取最新成交信息,我们直接从 Redis 缓存取出列表,然后拼接成一个 JSON 字符串:
public String getRecentTicks() {List<String> data = redisService.lrange(RedisCache.Key.RECENT_TICKS, 0, -1); | |
if (data == null || data.isEmpty()) {return "[]"; | |
} | |
StringJoiner sj = new StringJoiner(",", "[", "]"); | |
for (String t : data) {sj.add(t); | |
} | |
return sj.toString();} |
用 Lua 脚本更新 Redis 缓存还有一个好处,就是 Lua 脚本执行的时候,不但可以更新 List,还可以通过 Publish 命令广播事件,后续我们编写基于 WebSocket 的推送服务器时,直接监听 Redis 广播,就可以主动向浏览器推送 Tick 更新的事件。
类似的,针对每一种 K 线,我们都在 Redis 中用 ZScoredSet 存储,用 K 线的开始时间戳作为 Score。更新 K 线时,从每种 ZScoredSet 中找出 Score 最大的 Bar 结构,就是最后一个 Bar,然后尝试更新。如果可以持久化这个 Bar 就返回,如果可以合并这个 Bar 就刷新 ZScoreSet,用 Lua 脚本实现如下:
local function merge(existBar, newBar) | |
existBar[3] = math.max(existBar[3], newBar[3]) -- 更新 High Price | |
existBar[4] = math.min(existBar[4], newBar[4]) -- 更新 Low Price | |
existBar[5] = newBar[5] -- close | |
existBar[6] = existBar[6] + newBar[6] -- 更新 quantity | |
end | |
local function tryMergeLast(barType, seqId, zsetBars, timestamp, newBar) | |
local topic = 'notification' | |
local popedScore, popedBar | |
-- 查找最后一个 Bar: | |
local poped = redis.call('ZPOPMAX', zsetBars) | |
if #poped == 0 then | |
-- ZScoredSet 无任何 bar, 直接添加: | |
redis.call('ZADD', zsetBars, timestamp, cjson.encode(newBar)) | |
redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}') | |
else | |
popedBar = cjson.decode(poped[1]) | |
popedScore = tonumber(poped[2]) | |
if popedScore == timestamp then | |
-- 合并 Bar 并发送通知: | |
merge(popedBar, newBar) | |
redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar)) | |
redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(popedBar) .. '}') | |
else | |
-- 可持久化最后一个 Bar,生成新的 Bar: | |
if popedScore < timestamp then | |
redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar), timestamp, cjson.encode(newBar)) | |
redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}') | |
return popedBar | |
end | |
end | |
end | |
return nil | |
end | |
local seqId = ARGV[1] | |
local KEY_BAR_SEQ = '_BarSeq_' | |
local zsetBars, topics, barTypeStartTimes | |
local openPrice, highPrice, lowPrice, closePrice, quantity | |
local persistBars = {} | |
-- 检查 sequence: | |
local seq = redis.call('GET', KEY_BAR_SEQ) | |
if not seq or tonumber(seqId) > tonumber(seq) then | |
zsetBars = {KEYS[1], KEYS[2], KEYS[3], KEYS[4] } | |
barTypeStartTimes = {tonumber(ARGV[2]), tonumber(ARGV[3]), tonumber(ARGV[4]), tonumber(ARGV[5]) } | |
openPrice = tonumber(ARGV[6]) | |
highPrice = tonumber(ARGV[7]) | |
lowPrice = tonumber(ARGV[8]) | |
closePrice = tonumber(ARGV[9]) | |
quantity = tonumber(ARGV[10]) | |
local i, bar | |
local names = {'SEC', 'MIN', 'HOUR', 'DAY' } | |
-- 检查是否可以 merge: | |
for i = 1, 4 do | |
bar = tryMergeLast(names[i], seqId, zsetBars[i], barTypeStartTimes[i], {barTypeStartTimes[i], openPrice, highPrice, lowPrice, closePrice, quantity }) | |
if bar then | |
persistBars[names[i]] = bar | |
end | |
end | |
redis.call('SET', KEY_BAR_SEQ, seqId) | |
return cjson.encode(persistBars) | |
end | |
redis.log(redis.LOG_WARNING, 'sequence ignored: exist seq =>' .. seq .. '>=' .. seqId .. '<= new seq') | |
return '{}' |
接下来我们编写 QuotationService
,初始化的时候加载 Redis 脚本,接收到 Tick 消息时调用脚本更新 Tick 和 Bar,然后持久化 Tick 和 Bar,代码如下:
public class QuotationService { | |
RedisService redisService; | |
MessagingFactory messagingFactory; | |
MessageConsumer tickConsumer; | |
private String shaUpdateRecentTicksLua = null; | |
private String shaUpdateBarLua = null; | |
public void init() throws Exception {// 加载 Redis 脚本: | |
this.shaUpdateRecentTicksLua = this.redisService.loadScriptFromClassPath("/redis/update-recent-ticks.lua"); | |
this.shaUpdateBarLua = this.redisService.loadScriptFromClassPath("/redis/update-bar.lua"); | |
// 接收 Tick 消息: | |
String groupId = Messaging.Topic.TICK.name() + "_" + IpUtil.getHostId(); | |
this.tickConsumer = messagingFactory.createBatchMessageListener(Messaging.Topic.TICK, groupId, | |
this::processMessages); | |
} | |
// 处理接收的消息: | |
public void processMessages(List<AbstractMessage> messages) {for (AbstractMessage message : messages) {processMessage((TickMessage) message); | |
} | |
} | |
// 处理一个 Tick 消息: | |
void processMessage(TickMessage message) {// 对一个 Tick 消息中的多个 Tick 先进行合并: | |
final long createdAt = message.createdAt; | |
StringJoiner ticksStrJoiner = new StringJoiner(",", "[", "]"); | |
StringJoiner ticksJoiner = new StringJoiner(",", "[", "]"); | |
BigDecimal openPrice = BigDecimal.ZERO; | |
BigDecimal closePrice = BigDecimal.ZERO; | |
BigDecimal highPrice = BigDecimal.ZERO; | |
BigDecimal lowPrice = BigDecimal.ZERO; | |
BigDecimal quantity = BigDecimal.ZERO; | |
for (TickEntity tick : message.ticks) {String json = tick.toJson(); | |
ticksStrJoiner.add("\"" + json + "\""); | |
ticksJoiner.add(json); | |
if (openPrice.signum() == 0) { | |
openPrice = tick.price; | |
closePrice = tick.price; | |
highPrice = tick.price; | |
lowPrice = tick.price; | |
} else {// open price is set: | |
closePrice = tick.price; | |
highPrice = highPrice.max(tick.price); | |
lowPrice = lowPrice.min(tick.price); | |
} | |
quantity = quantity.add(tick.quantity); | |
} | |
// 计算应该合并的每种类型的 Bar 的开始时间: | |
long sec = createdAt / 1000; | |
long min = sec / 60; | |
long hour = min / 60; | |
long secStartTime = sec * 1000; | |
long minStartTime = min * 60 * 1000; | |
long hourStartTime = hour * 3600 * 1000; | |
long dayStartTime = Instant.ofEpochMilli(hourStartTime).atZone(zoneId).withHour(0).toEpochSecond() * 1000; | |
// 更新 Tick 缓存: | |
String ticksData = ticksJoiner.toString(); | |
Boolean tickOk = redisService.executeScriptReturnBoolean(this.shaUpdateRecentTicksLua, | |
new String[] { RedisCache.Key.RECENT_TICKS}, | |
new String[] { String.valueOf(this.sequenceId), ticksData, ticksStrJoiner.toString()}); | |
if (!tickOk.booleanValue()) {logger.warn("ticks are ignored by Redis."); | |
return; | |
} | |
// 保存 Tick 至数据库: | |
saveTicks(message.ticks); | |
// 更新 Redis 缓存的各种类型的 Bar: | |
String strCreatedBars = redisService.executeScriptReturnString(this.shaUpdateBarLua, | |
new String[] { RedisCache.Key.SEC_BARS, RedisCache.Key.MIN_BARS, RedisCache.Key.HOUR_BARS, | |
RedisCache.Key.DAY_BARS }, | |
new String[] { // ARGV | |
String.valueOf(this.sequenceId), // sequence id | |
String.valueOf(secStartTime), // sec-start-time | |
String.valueOf(minStartTime), // min-start-time | |
String.valueOf(hourStartTime), // hour-start-time | |
String.valueOf(dayStartTime), // day-start-time | |
String.valueOf(openPrice), // open | |
String.valueOf(highPrice), // high | |
String.valueOf(lowPrice), // low | |
String.valueOf(closePrice), // close | |
String.valueOf(quantity) // quantity | |
}); | |
Map<BarType, BigDecimal[]> barMap = JsonUtil.readJson(strCreatedBars, TYPE_BARS); | |
if (!barMap.isEmpty()) {// 保存 Bar: | |
SecBarEntity secBar = createBar(SecBarEntity::new, barMap.get(BarType.SEC)); | |
MinBarEntity minBar = createBar(MinBarEntity::new, barMap.get(BarType.MIN)); | |
HourBarEntity hourBar = createBar(HourBarEntity::new, barMap.get(BarType.HOUR)); | |
DayBarEntity dayBar = createBar(DayBarEntity::new, barMap.get(BarType.DAY)); | |
saveBars(secBar, minBar, hourBar, dayBar); | |
} | |
} | |
} |
K 线是一组 Bar 按 ZSet 缓存在 Redis 中,Score 就是 Bar 的开始时间。更新 Bar 时,同时广播通知,以便后续推送。要查询某种 K 线图,在 API 中,需要传入开始和结束的时间戳,通过 ZRANGE 命令返回排序后的 List:
String getBars(String key, long start, long end) {List<String> data = redisService.zrangebyscore(key, start, end); | |
if (data == null || data.isEmpty()) {return "[]"; | |
} | |
StringJoiner sj = new StringJoiner(",", "[", "]"); | |
for (String t : data) {sj.add(t); | |
} | |
return sj.toString();} |
参考源码
可以从 GitHub 或 Gitee 下载源码。
GitHub
小结
行情系统是典型的少量写、大量读的模式,非常适合缓存。通过编写 Lua 脚本可使得更新 Redis 更加简单。
