目前已经支持基本的生产消费,并自己实现了异常重试和死信的功能,自己在开发中运行尚可,如果本项目有帮到你,欢迎大fork和给星。
https://github.com/WilliamChen-luckbob/Aliyun-RocketMq-FlexMessageCenter
基于阿里云RocketMQ,在不停机,不修改代码的情况下,用户在阿里云管理控制台设定新的消息配置完毕后,只需直接将相关配置参数写入数据库的basic_config与consumer_config两个表,程序将为你自动管理相应的生产者和消费者,省去各种硬编码。
萌新从hello world到现在刚刚好满一年了,最近搬砖的日子里遇到一些可以提升和优化的项目,在此记录一下。
公司设计采用阿里云提供的RocketMQ服务,在官方文档和网上一些文献的指导下,我们实现了基本的生产消费功能,并且踩过了一些坑,当时的消息生产消费部分还是硬编码,每新增一组TOPIC或者相关的消费者组或者TAG,都会要求后端开发重新写一套消费者与生产者,并增多配置项,需要停机,重写,发布。
最近项目不太紧,在测试自己的接口时这种解决方案给我带来了巨大的困扰。于是跟领导提出了一种设想,是否有一种方案,能够尽可能的减少在增删改RocketMQ的配置时,后端开发人员的工作量。
最近醉心于研究Apache DolphinScheduler分布式调度框架,同时也混入了传说中的Apache顶级项目的contributor大军,受到Dolphin的启发,我有了这个解决方案的思想雏形。
大致花了半天的时间,我开发完了这个项目的第一个版本并在测试环境测试通过了,反响良好,完全取代了之前的硬编码项目。当然了,目前项目尚未加入消费异常的处理机制,只能做一个简单的记录。也没有加入分布式集群的解决方案。第一个版本就是解决动态配置动态监听的问题,将来有空我会慢慢补充。
目前而言,我们使用的阿里云RocketMQ服务是没有FTP公网模式的,因此基本只能使用HTTP协议进行对应的生产消费操作。因此在这里我弃用了FTP的SDK,毕竟,用FTP SDK根本都连不上。
与我们平时所见的RocketMQ基本一致,商业版的RocketMQ生产者主要关注以下属性
- Topic
- InstanceId
这两个属性是用来判定消息的生产是否需要新开Producer对象,一旦二者中有任意属性变化,都使用一组新的生产者进行生产发送,因此,项目中我使用Topic+InstanceId作为producer对象的名称,名称在当前版本是唯一的。
生产者master线程负责定时轮询配置,动态增删生产者实例线程,而生产者实例线程负责实际生产。
消费者主要关注以下属性:
- Topic
- Tag
- GroupId
- InstanceId
同样地,四者中任意一者变化,都尽量使用一个新的消费者线程去监听,毕竟,萌新书读的不多,在踩过了订阅关系一致性的坑后,这破事儿能避就避了。这也是为啥我一定要写这个项目的原因,不管是迁移环境还是啥的,但凡错任何一丢丢地方,导致各种莫名的问题,小则卡一两天,大则卡上一周,这不得给部门领导挂上十字架祭天吗。
消费者master线程负责定时轮询配置,动态增删消费者实例线程,而消费者实例线程负责实际消费。
同时消费者也能够根据当前项目的注册中心获取对应的处理服务是否在线,如果不在线,会停止相应的数据消费,避免在消费服务挂掉的情况下造成大量的消费失败。
通过对DB配置的定时轮询,更新本地变量,被上述的生产消费者master线程抓取并刷新对应的工作实例线程,从而达到动态配置。
本项目注册中心采用Eureka,用前在根目录properties中找到配置表的初始化sql脚本DBInitialize.sql执行一下,然后做好你的配置,启动项目注册到自己的注册中心即可,普通的springboot启动方式,时间有限不再赘述。
对于Nacos版本,目前我暂时没时间搞。
今后有空会拓展的地方
- 将目前单机多线程的master和worker都做成基于Zookeeper的高可用集群
- 加入配置Controller和相应的swagger界面
- 寻找更优雅的监听方案,让配置修改更快被响应
- 加入消费异常的实际处理相关功能
本萌新是真的菜,没人带,基本靠自学,大佬们如果有更好的解决思路,希望大佬们不吝赐教!