高并发条件下消息队列的设计与实现

2024-01-10 01:48李方方周亚凤王校建
黑龙江科学 2023年24期
关键词:队列内存消息

李方方,周亚凤,王校建,胡 蕊

(南京信息职业技术学院,南京 210043)

近年来,互联网用户爆发式增加,这对行业软件系统的高并发及高处理能力提出了挑战,特别是医药、电子商务、通信、金融、游戏等行业,用户基数大,与系统互动频繁,会形成巨大的请求数据,原来的集群分布式方法已经无法很好地满足当前用户的需求,受限于网络等因素,如果不具有高并发处理能力,很有可能造成通信请求数量的损失。为了应对各行业业务量的持续增长,需在现有系统架构上进行技术变革。研究表明,强大高效的及时通信消息处理能力能减轻服务器压力,是保证高并发、高性能、高可靠的手段之一,因此需对消息通信进行处理,升级解决方案,引入消息即时通信技术,替代传统的HTTP请求预处理,保证系统吞吐量,满足系统的高并发高处理需求[1]。

研发设计了一个即时消息队列FineMQ,通过即时消息列队利用中间件的处理能力,解决应用解耦及消息异步,实现高性能及最终的一致性架构。

1 消息队列

消息队列的英文是Message Queue(简称MQ),是一种不同进程间或同一进程中不同线程间的通信方式,提供了标准的一步通信协议,使用进程间通信交互方式,每一个软件的贮列记录都包含详细字段,通过字段进行通信,实现应用间的异步通信,保证应用间的高可用,通过消息组件中的Producer、Broker及Consumer达到消息的正常消费及通信的及时性与准确性[2]。

消息队列的特点是异步、解耦及广播。消息队列本身是异步的,只需要保证同一时刻Producer和Consumer有一个在线即可。Consumer不关心业务的处理流程,只关心业务的处理结果,需要将Consumer操作结果实时反馈给Consumer,至于是否处理,不是Consumer要关心的部分。消息共享只需要将消息发布在一个固定的平台,有新Consumer想要看Producer发布的消息,只需要订阅这个平台就能看到Producer发布的消息,不需要每次都重新发布已经发过的消息。

1.1 消息

需要两个或多个通信终端/软件/应用参与对接,其中需要传递的信息内容称为消息。消息的定义范畴很广泛,可以是有声音频,也可以是转化的二进制,如视频传输、文本传输、图像传输、数字传输等。还有可能含有时间戳等信息,为了鉴别消息发送与接受的时效性,有时还会引入sign的鉴权,保证消息发送与接收者的绝对安全。

1.2 队列

列队是一种常用的数据结构,是存在于内存或磁盘中、开辟一块公用存储空间存放数据的一组数据结构,用来处理一组数据的临时空间。为了保证数据的有序处理,将所有数据存放在列队中,在列队中依次读取操作,保证系统处理数据的完整性。列队一般分为两种模式,即顺序结构和链式结构。不同模式的数据处理性能有所不同,而消息队列是用来处理海量消息的一种方式。队列的排队机制主要分为4种,即先进先出(FIFO)、优先排队、公平排队、加权公平排队。

2 消息队列的对比和选择

目前市面上主流的消息队列有很多[3],如2006年发布的Rabbit MQ,2011年发布的Kafka,2012年初发布的RocketMQ及2018年雅虎生产的Apache Pulsar,这些产品得到了大众的广泛认可,具有很好的稳定性,处理高并发、高可用的能力突出,已应用于各行各业。 为了更加直观地看出其区别,将各消息队列的性能指标一一列出,如表1所示。

表1 常见的消息队列比较Tab.1 Comparison of common message queue

面对这么多的MQ,一般使用以下原则选择消息队列:必须开源。万一用户使用的MQ突然遇到了一个影响业务的bug,用户可通过修改源代码来规避这个bug。要有以下几个特性:消息的可靠性、支持集群、性能要好,系统内存占用小。

基于这些原则,以电商平台系统为例,设计一款在高并发条件下的消息队列FineMQ,主要实现用户商品的销售等功能,FineMQ为中小型电商交易平台提供了一个轻量级、高可用消息队列的选择。

3 消息队列的设计

消息队列系统需要处理大量数据,包括数据处理容错机制,数据恢复机制,通信安全机制,因此设计的功能模块需考虑到这些因素[4],设计了以下几个功能模块:连接模块,用来处理消息发布者与消息订阅者之间的连接或是长连接。配置管理模块,用来处理消息中心的配置信息、消息订阅主题等。队列管理模块,用来视图化输出当前消息处理效率,查看消息队列处理响应速度,帮助分析排查问题。安全模块,用来保证消息订阅、发送与接收的安全性,达到安全通信要求。日志模块,用来记录消息处理情况并保留相关运行日志,方便定位排查相关问题。性能管理模块,用来监控消息队列系统是否满足当前设计性能,是否达标,用于判断产品是否成功,并可用于性能瓶颈分析。故障回复模块,保证在服务器宕机、网络中断、系统进程卡死、消息丢失等特殊情况下的信息故障恢复。

4 消息队列的实现

以电商系统为例,前台界面的设计主要实现电商的主要功能,包括系统的主页商品显示、商品倒计时抢购、下单、加入购物车、支付、查看消费数据及统计报表等功能。根据浏览导航栏查看商品信息。通过浏览网站界面点击详情,便可知道商品的详细信息,对满意的商品直接加入购物车或进行商品收藏等操作,进入订单支付界面,多种支付方式简单方便。后台管理员端主要实现管理员登录、发布商品、修改发布的商品、管理商品规格(SKU)、管理商品评论、修改订单状态、查看商品收藏及用户足迹等操作。定点销售作为电商平台的核心功能,不允许出现超卖的情况,因此在设计中通过消息队列方案保证该功能的正常实现,保证该模块的高并发与高可用性能,通过消息队列技术实现瞬时高并发请求,对海量的请求响应结果做回应,保证每个用户的请求都是有效且不丢失消息,而响应界面的设计则是对高并发下对应场景业务处理能力的最好回应。

消息队列功能的实现整体思路是需要确认并创建一个整体的数据交互流,确定Producer、Broker与Consumer之间的关系。Producer发出消息给Broker,Broker发出消息给Consumer,Consumer接收消息,处理消息之后回复消息消费确认。Broker删除/备份消息。通过RPC协议把整个数据流串起,通过选用合适的RPC协议达到消息列队的高可用性,做到数据无状态处理,方便消息列队水平拓展。考虑特殊情况下消息堆积的情况处理,如果在合适的时机向Consumer投递消息,而消息推挤的最佳处理方式是通过存储方案解决。存储方案可以是磁盘文件存储、内存存储等。

目前流行的ActiveMQ、RabbitMQ和ZeroMQ等消息队列大多是为了实现AMQP、STOMP、XMPP之类的协议,占用太多的内存(如新版本ActiveMQ建议分配内存达1 G+),但很多Web应用中只是想找到一个可以缓解高并发请求的解决方案,一个轻量级的消息队列实现方式才是本系统真正需要的。参考RocketMQ[5],自行设计了一个轻量级消息队列(FineMQ)。以下为各个模块的设计:

4.1 Broker子模块的设计

Broker既要实现接收Producer的消息推送,也要向Consumer提供消费信息。Broker应支持集群且集群地位平等,支持集群可提高系统吞吐量。Broker要内置注册中心,通过注册中心Broker能动态感知Producer与Consumer的动作,自动匹配在线的Broker。Broker收到Producer的消息时,第一时间应存入队列,而不是直接存储消息,通过队列的异步将队列中的消息存入MySQL中。Broker实现核心代码。批量新增消息:在接收Producer生产的消息的PRC调用时,Broker不会立刻存储,而是立即push到内存队列中,同时立即响应PRC调用,而内存队列会通过异步方式将队列中的消息存储到数据库。Broker在接收到“消息锁定”等同步RPC调用时会触发同步调用,采用乐观锁方式锁定消息。通过ChannelHandlerContext来直接跳到上一次终止的位置,不需要每次都要从头开始,减少每次从头开始找指定位置消息的时间。异常捕获机制:当捕获到异常时,自动关闭连接。

4.2 Producer子模块的设计

Producer(消息生产者)兼容异步批量多线程生产+同步生产两种方式,提升消息发送性能。消息发送过程组成如下:

组装消息,即对发送的消息进行按需组装,包括设置topic、tag、y延时及是否有序等。

生成topicPublishInfo,定时或按需从namesrv中同步该topic的broker消息。选择队列,从topicPublishInfo按照轮询方式选择队列。发送消息,通过异步发送消息给Broker。

串行消费,ShardingId 保持一致即可,如消息,可将 ShardingId 设置为商品ID,则该商户全部消息固定在一台机器消费。广播消费,即点击广播消息按钮一次,将会生产一条广播消息(消费者IMqConsumer注解的group 属性修改不一致即可。一条消息将会广播给该主题全部在线group,每个group都会消费,单个group只会消费一次)。延时消费,EffectTime设置为固定时间点即可,如订单30 min超时取消,可将EffectTime设置为30 min后的时间点,到时将会自动消费。失败重试消费,RetryCount设置重试次数即可。如发送短信消息,第三方服务不稳定时失败很常见,可设置RetryCount为3,失败时将会自动重试指定次数。

4.3 Consumer子模块的设计

在设计Consumer时要考虑以下几个因素:支持用户组,消费主题(topic),消费开关,避免重复消费。Consumer子模块初始化主要包括构建consumer订阅和消费分组的重试队列。创建Rebalance服务, 该服务负责messageQueue的消费。启动消费偏移量获取服务,获取上一次消费位移。启动定时任务,核心任务之一是定时去namesrv拉取broker信息。启动pullMessageService,从broker拉取等待消费的消息。启动rebalanceService,负责定期调整consumer端负载均衡。

Consumer通过多线程自适应轮询,定时向Broker拉取消息进行顺序消费,消息消费结束后调用RPC修改消息状态,追加消息日志,Broker利用内存队列的方式通过异步将消息队列中的变更存储到数据库中。

5 消息队列调度算法

当系统接收到新消息时,系统调度算法确定消息是否为响应消息。如果是,带有和信息相同ID的信息表示传输将正常删除缓存队列中存储的信息。确定持续比特是否为1,消息被永久保存。再次确定接收到消息的用户是本地还是需要路由,判断用户是否处于在线状态,如果是在线直接发送给用户。例如,可以使用前进程与后台进程两个队列。在前队列中只能使用RR调度算法,但在端队列中只能使用FCFS调整算法。除此之外,还需在队列内进行调整。一般情况下,使用固定优先权预设调整。因此前台队列应优先于后台队列[3]。消息队列调度算法流程如图1所示:

图1 消息队列调度算法流程Fig.1 Flow of message queue scheduling algorithm

每个队列都是双向链表,信件从队列的头部中提取,并在末尾进行分立。出队和入队可以并行工作,互不干涉。对队列的操作以消息为单位,对每个队列采用FIFO原理。

6 系统的并发性能测试

为了保证消息队列的性能要求,需对该功能进行一定的高并发环境压力测试。通过Jemeter工具进行用例压力测试。用例如表2所示。

表2 性能测试用例Tab.2 Performance test cases

7 结束语

使用的消息队列是通过参考RocketMQ而实现的面向小型电商平台的轻量级消息队列。该消息队列解决了并发量不高、对服务器要求较高、占用系统资源过多等问题,增加了系统的稳定性及安全性,减轻了服务器压力,减少了内存占用,为小型电商平台提供了一个轻量级消息队列的选择。

猜你喜欢
队列内存消息
外部高速缓存与非易失内存结合的混合内存体系结构特性评测
队列里的小秘密
基于多队列切换的SDN拥塞控制*
一张图看5G消息
“春夏秋冬”的内存
在队列里
丰田加速驶入自动驾驶队列
消息
消息
消息