![MongoDB进阶与实战:微服务整合、性能优化、架构管理](https://wfqqreader-1252317822.image.myqcloud.com/cover/697/38209697/b_38209697.jpg)
3.6 小技巧——使用固定集合实现FIFO队列
在股票实时系统中,大家往往最关心股票价格的变动。而应用系统中也需要根据这些实时的变化数据来分析当前的行情。
倘若将股票的价格变化看作是一个事件,而股票交易所则是价格变动事件的“发布者”,股票APP、应用系统则是事件的“消费者”。这样,我们就可以将股票价格的发布、通知抽象为一种数据的消费行为,此时往往需要一个消息队列来实现该需求。
在本章的内容中,我们已经领略过固定集合(capped collection)的一些特性。而基于前面的介绍,我们知道这种类型的集合拥有固定的大小,同时满足高性能FIFO读写能力。因此,可以利用固定集合来实现股票系统中的消息队列。
首先,需要在数据库中声明固定集合,通size来指定该消息队列的容量,代码如下:
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_67_1.jpg?sign=1739265932-z6P78WCVN0ugR8h9yGX5vel1FrfTGrdu-0-36ee5d81aef0c27b6a173223c894782b)
这样,我们就拥有了stock_queue消息队列,其可以容纳10MB的数据。每一条消息的格式可以定义为如下形示。
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_67_2.jpg?sign=1739265932-JPcURascw4B14vZkFweDfbDpqSx58sQi-0-62cd40bc0b0bfeb1578ff0b2e382e68b)
● timestamp指股票动态消息的产生时间。
● stock指股票的名称。
● price指股票的价格,是一个Double类型的字段。
其中,为了能支持按时间条件进行快速的检索,比如查询某个时间点之后的数据,可以为timestamp添加索引,代码如下:
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_68_1.jpg?sign=1739265932-8ddMEUuxGruNSwMu0sFTmGBiIdyiG5DZ-0-d286af4710df4bcc918508ce141e090f)
1.发布股票动态
为了模拟股票的实时变动,我们实现如下函数:
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_68_2.jpg?sign=1739265932-XfuSZ5ACvdTBObvANpTmsj6drTnRfIJn-0-593325bf3f12f18842f796e1b93496b7)
执行pushEvent函数,此时客户端会每隔1秒向stock_queue中写入一条股票信息,结果如下:
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_68_3.jpg?sign=1739265932-yoSk5WCMTr2dWO9VboSmgCLWxJvicFvI-0-0d4bbd8d1595d3fba400911430a28461)
2.监听股票动态
对于股票动态的消费方来说,更关心的是最新数据,同时还应该保持持续进行“拉取”,以便知晓实时发生的变化。根据这样的逻辑,可以实现一个listen函数,代码如下:
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_69_1.jpg?sign=1739265932-uPwrGopN2BYhQnnS1nVKO6VMSUY0k7nw-0-87e0fc36b8265ad831c0986062916221)
上述代码中,find操作的查询条件被指定为仅查询比当前时间更新的数据,而由于采用了读取游标的方式,因此游标在获取不到数据时并不会被关闭,这种行为非常类似于Linux中的tail-f命令。
接下来,在一个循环中会定时检查是否有新的数据产生,一旦发现新的数据(cursor.hasNext()=true),则直接将数据打印到控制台。
执行这个监听函数,就可以看到实时发布的股票信息,代码如下:
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_69_2.jpg?sign=1739265932-mYLgoQxcXmQ9SIxCIrx3B2xBzFeWvG4f-0-121460ec2856577c40e6dd746a3261bb)
![](https://epubservercos.yuewen.com/6F449D/20118171608699906/epubprivate/OEBPS/Images/40827_70_1.jpg?sign=1739265932-jgfvHwStV1h3LdaR9SOTA0uq8IgCF406-0-04fb8c50e300e48d8184922a883b66e4)