前言
在引入一项技术之前,首先必须清楚的是该技术可以为项目解决什么问题。个人在了解消息队列(Message Queue)之前,以为消息队列主是用于发送短信、邮件等消息发送(异步解耦),但深入理解才发现自己的理解错了,MQ的作用不止体现在一些用户接收到的具体消息里,还可用于其它应用的数据发送、通用的业务处理等。
消息队列从字面上意思解读就是将消息存放到队列里,根据队列FIFO(先入先出)的特性进行消息消费。在实际开发中,是一种跨进程的通信机制,用于应用间的消息传递。 
在引入MQ之前,需要了解的优缺点与应用场景
MQ的主要优点为解耦、异步、削峰,以下举一个简单的场景来反应这几个特性。
在微服务项目中,一般会根据核心业务进行系统的垂直拆分再进行单独部署。在上图中,各系统在下单业务里主要负责的内容如下:
- 订单系统:创建订单,将下单消息(如订单id、用户数据)发送到MQ
- MQ:限制每秒的订单请求处理数(如每秒接收2000个请求但数据库只能处理1000个则只处理1000个,处理不过来的先在消息队列里堆积)
- 物流系统:创建订单物流信息
- 积分系统:用户购物积分信息更新
想象下以上场景没有MQ的的存在时创建订单流程中存在的问题:
- 订单系统创建完订单信息后要去调用物流系统、积分系统上的业务接口,系统严重的耦合在一起(解耦)
- 订单系统若非通过线程去调用其它系统的接口,还需同步等待返回浪费不少时间(异步,避免创建线程调用的麻烦)
- 用户高峰期请求过多数据库处理不过来进而导致应用崩溃(削峰)
任何事物都有两面性,虽然MQ可以给系统解决不少问题,但也会引入一些问题,如:
- 系统复杂度提高,需要考虑消息重复消费、消息丢失等问题
- 数据一致性问题,如上例中的物流或库存系统写库出现异常如何回滚补偿
了解了MQ的一些特性后,再讨论下几个适合使用MQ的场景:
- 上游系统不关心下游的执行结果(如用户注册成功后用户系统通过MQ向用户发送邮件,但发送成不成功用户系统根本不在意)
- 依赖于数据的定时任务(如下单后24小时内不支付则取消订单,申请退款72小时内商家不处理则自动退款)
引入MQ后的一些问题解决思路
- 消息重复消费(保证消息的幂等性)- 幂等性:对于同一操作的请求无论请求多少次结果都是一致的,在MQ中的具体体现为同一条消息无论发送都少次都会被消费一次。 - 由于网络抖动(延迟)的原因消息重复发送的问题是不可避免的,如果在消费端消费时没有做好消息的幂等性保证就有可能出现重复消费,导致同一条消息被多次消费、写库多次的情况。比较常见的做法是为消息添加一个唯一标识(ID),在消费时根据ID查询数据库是否存在该消息记录,如果不存在再插入消息,存在则不进行插入消费。当生成与消费时间间隔不长时,可使用Redis提高消息幂等性的效率,如: - 消费者消费前根据ID去查询redis是否存在该消息 
- 不存在该消息则消费并写入redis,存在该消息则不消费返回 - 关于消息ID: 
- RocketMQ的每条消息都会配有全局唯一的ID 
- 如果消息中间件不会生成ID,可考虑一些ID服务(如雪花算法)生成全局唯一ID 
- 建议ID不与实际业务关联 
 
如目前个人工作中负责的消息中心应用是基于MongoDB+RocketMQ的技术架构,MongoDB负责存储各个应用发送过来的消息(主要为Sms、Email等),每次消费前通过RocketMQ的Message ID查询Mongo保证消息幂等性避免重复消费,消费成功后更新DB中的消息状态。
- 消息丢失(消息的可靠性)MQ各组件的消息丢失含义都有所不同,导致与解决方案也不一定相同,以kafka、rocket的消息传递模型(Producer->Broker->Consumer)为例:- Producer:消息未持久化到Broker中,或消费者未能成功消费到消息。Kafka可通过更改ack配置解决,rocketMQ中会返回消息发送状态码。
- Broker:消息成功传到到我这里了,可我因为某些原因(不同的MQ可能因机制问题有不同原因)弄丢了,如果是硬件原因(如宕机、磁盘损坏)建议你copy(集群部署)几个我
- Consumer:我拿到了消息,但消费失败了或中途挂掉了没告诉Broker。可通过各MQ中间件的ACK机制解决。
 
基于RocketMQ的简单例子技术框架与业务模型
以下便以一个基于MongoDB+RocketMQ+Eureka+Spring Cloud Config的技术框架并结合使用MQ中的问题搭建一个简单的消息中心项目案例,其中各组件在项目中的主要作用如下:
- Spring Cloud Config:消息配置(如topic、ConsumerGroup、ProducerGroup)中心。
- Eureka:应用服务注册中心,负责项目中各服务的发现与提供调用。
- MongoDB:由于消息的事务关系不强且Mongodb格式文档自由(json存储,随意增删字段),所以使用Mongodb存储各个应用发送过来的消息(主要为Sms、Email等),每次消费前通过RocketMQ的Message ID查询Mongo保证消息幂等性避免重复消费,消费成功后保存消息。
- RocketMQ:消息接收、存储、发送。
下图为该项目的应用关系模型:
消息中心应用:统一通用消息的业务处理应用,如短信发送、邮件发送、员工服务号推送等消息的处理
问卷应用:负责员工调查问卷的分发,在该例子中只是一个简单的消息发送测试应用
common:存放各应用通用类,如短信消息类(SmsMessage)、消息常量类
config-server-properties:配置中心的配置存放目录
由于该项目主要用于演示一些MQ的功能与使用中的问题解决方式,所以编码部分比较简单。
应用例子编码
- 通用模块编码(common)- 通用模块主要存放各应用通用类(如实体、常量、配置、功能等)。 
 MessageConstant:维护消息常量- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23- public interface MessageConstant { 
 interface System {
 String QUESTION = "QUESTION";
 }
 interface Topic {
 String SMS_TOPIC = "rocketmq.topic.sms";
 String SMS_TOPIC_TEMPLATE = "${rocketmq.topic.sms}";
 String MAIL_TOPIC = "rocketmq.topic.mail";
 String MAIL_TOPIC_TEMPLATE = "${rocketmq.topic.mail}";
 }
 interface Producer {
 String SMS_GROUP_TEMPLATE = "${rocketmq.producer.group.sms}";
 String MAIL_GROUP_TEMPLATE = "${rocketmq.producer.group.mail}";
 }
 interface Consumer {
 String SMS_GROUP_TEMPLATE = "${rocketmq.consumer.group.sms}";
 String MAIL_GROUP_TEMPLATE = "${rocketmq.consumer.group.mail}";
 }
 }- BaseMessage:基础消息类,所用的通用消息都需继承此类方便统一信息的管理 - 1 
 2
 3
 4
 5
 6
 7
 8
 9- @Data 
 @Accessors(chain = true)
 public abstract class BaseMessage implements Serializable {
 /**
 * 消息源系统:{@link io.wilson.common.message.constant.MessageConstant.System}
 */
 private String system;
 }- SmsMessage:通用短信消息类,短信内容数据载体 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24- @EqualsAndHashCode(callSuper = true) 
 @Data
 @Accessors(chain = true)
 @ToString(callSuper = true)
 public class SmsMessage extends BaseMessage {
 /**
 * 短信创建用户
 */
 private String createUserId;
 /**
 * 接收短信用户
 */
 private String toUserId;
 /**
 * 手机号码
 */
 private String mobile;
 /**
 * 短信内容
 */
 private String content;
 }
- 消息中心应用(message-center)- 消息中心在进行编码之前,需确认消息中心该如何进行消息的处理。该项目所处的业务环境是各应用可能都需要发送一些短信消息、邮件、服务号消息等,相同消息的业务处理是一致的,所以消息中心对消息接收消费的主要流程如下: - 保证消息幂等性(查询数据库使用已有消息记录避免重复消费)
- 消息业务处理
- 消息日志入库
 - 在该项目中,不同的消息类型存储在不同的Mongodb collection(同Mysql table概念),但共用一个消息日志类MessageLog: - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34- @Data 
 @Accessors(chain = true)
 public class MessageLog implements Serializable {
 private String msgId;
 /**
 * 发送方系统名称 {@link io.wilson.common.message.constant.MessageConstant}
 */
 private String system;
 /**
 * 消息对象json字符串
 */
 private String msgContent;
 /**
 * 业务执行结果
 */
 private Boolean success;
 private LocalDateTime createTime;
 private LocalDateTime updateTime;
 /**
 * 初始化消息记录
 *
 * @param message 消息
 * @return
 */
 public static <T extends BaseMessage> MessageLog convertFromMessage(T message) {
 LocalDateTime now = LocalDateTime.now();
 return new MessageLog()
 .setSystem(message.getSystem())
 .setSuccess(false)
 .setCreateTime(now)
 .setUpdateTime(now);
 }
 }- 在该消费流程设计与开发编码过程中个人考虑的核心点如下: - 如果使用普通消息类(如SmsMessage)作为db存储的映射对象,会导致消息类掺杂不必要的属性(如createTime、updateTime、success),且作为一个通用的消息数据载体,普通消息类更适于作为一个VO而非DO使用,所以消息的处理结果、消息的创建更新时间这些作为原消息上的附加内容,更适合放到其它数据库映射对象中维护,所以定义了MessageLog作为消息记录的实体类
- 既然是作为各应用都可使用的通用消息所以肯定都会有一定数据量,虽然映射实体都一样,但存放到不同的collection可以提高操作的便捷性和获得更好的性能,系统编码可以更好地根据系统进行消息筛选
- 在消息消费流程中,保证消息幂等性和消息日志入库这两步只有数据库名是不同的,所以可定义一个父Listener进行消息监听消费的方法抽象,不同消息的业务处理交给不同的消息Service,同一类消息的消费可能会再细分调用不同的消息业务方法消费(如发送单条短信、批量发送短信),所以可以对各service抽象出一个consume()方法根据参数调用具体的service业务方法进行消息消费
 - 消息中心类图与消费流程图- 为了更好地展示消息中心中类之间的关系,描绘以下类图: 
   - 当一条短信消息发送到消息中心时,其消费流程如下图:  
- 消息业务处理编码- BaseMessageService:消息业务消费抽象接口,抽象每个消费者(Listener)调用的业务消费方法 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12- public interface BaseMessageService<T extends BaseMessage> { 
 /**
 * 消费消息
 *
 * @param message 消息
 * @param consumeFunction 消费方法
 */
 default boolean consume(T message, Function<T, Boolean> consumeFunction) {
 return consumeFunction.apply(message);
 }
 }- BaseMessageService:短信消息业务抽象接口 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12- @Service 
 public interface SmsMessageService extends BaseMessageService<SmsMessage> {
 /**
 * 发送单条短信消息
 *
 * @param smsMessage
 * @return 业务处理结果
 */
 boolean sendSingle(SmsMessage smsMessage);
 }- SmsMessageServiceImpl:短信消息业务实现类 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18- @Service 
 @Slf4j
 public class SmsMessageServiceImpl implements SmsMessageService {
 @Override
 public boolean sendSingle(SmsMessage smsMessage) {
 // 短信业务操作结果
 boolean isSuccess = true;
 /*
 * 短信业务操作并把操作结果设到isSuccess中
 */
 if (Objects.equals(smsMessage.getToUserId(), "Wilson")) {
 isSuccess = false;
 log.info("短信发送失败,消息内容:{}", smsMessage);
 }
 return isSuccess;
 }
 }
- 消息业务处理编码- MessageLogConstant:维护MessageLog的相关常量(如不同消息的collection名) - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11- public interface MessageLogConstant { 
 /**
 * 各消息日志Mongo集合名
 */
 interface CollectionName {
 String SMS = "sms_message_log";
 String MAIL = "mail_message_log";
 }
 }- AbstractMQStoreListener:保证消息幂等性、消息日志入库操作的抽象Listener类方法中 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43- @Slf4j 
 public abstract class AbstractMQStoreListener {
 @Resource
 protected MongoTemplate mongoTemplate;
 /**
 * 判断消息是否已被消费
 *
 * @param msgId
 * @return
 */
 protected boolean isConsumed(String msgId) {
 long count = mongoTemplate.count(new Query(Criteria.where("msg_id").is(msgId)), collection());
 if (count > 0) {
 log.info("消息{}已成功消费过,请勿重复投递!", msgId);
 return true;
 }
 return false;
 }
 /**
 * 当前消息的mongo collection名:{@link io.wilson.message.domain.constant.MessageLogConstant.CollectionName}
 *
 * @return 当前消息存储的collection名
 */
 protected abstract String collection();
 /**
 * 保存消息消费记录
 *
 * @param success 业务执行结果
 * @param msgId 消息id
 * @param message
 */
 void store(boolean success, String msgId, BaseMessage message) {
 MessageLog messageLog = MessageLog.convertFromMessage(message)
 .setMsgId(msgId)
 .setMsgContent(JSONObject.toJSONString(message))
 .setSuccess(success);
 mongoTemplate.insert(messageLog, collection());
 }
 }- SmsMessageListener:短信消息监听器(消费者),如在消费过程中抛出异常,RocketMQ会以一定的时间间隔进行重新投递消费 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35- @Slf4j 
 @Service
 @ConditionalOnProperty(MessageConstant.Topic.SMS_TOPIC)
 @RocketMQMessageListener(topic = MessageConstant.Topic.SMS_TOPIC_TEMPLATE, consumerGroup = MessageConstant.Consumer.SMS_GROUP_TEMPLATE)
 public class SmsMessageListener extends AbstractMQStoreListener implements RocketMQListener<MessageExt> {
 @Resource
 private SmsMessageService smsMessageService;
 private static final String EXCEPTION_FORMAT = "短信消息消费失败,消息内容:%s";
 @Override
 public void onMessage(MessageExt message) {
 String msgId = message.getMsgId();
 if (isConsumed(msgId)) {
 return;
 }
 SmsMessage smsMessage = JSONObject.parseObject(message.getBody(), SmsMessage.class);
 log.info("接收到短信消息{}:{}", msgId, smsMessage);
 /*if (Objects.equals(smsMessage.getToUserId(), "2020")) {
 log.error("消息{}消费失败", message.getMsgId());
 // 抛出异常让RocketMQ重新投递消息重新消费
 throw new MQConsumeException(String.format(EXCEPTION_FORMAT, smsMessage));
 }*/
 boolean isSuccess = smsMessageService.consume(smsMessage, smsMessageService::sendSingle);
 if (!isSuccess) {
 log.info("短信消息业务操作失败,消息id: {}", msgId);
 }
 // 保存消息消费记录
 store(isSuccess, msgId, smsMessage);
 }
 @Override
 protected String collection() {
 return MessageLogConstant.CollectionName.SMS;
 }
 }- MessageCenterApplication:主程序 - 1 
 2
 3
 4
 5
 6
 7- @SpringBootApplication 
 @EnableDiscoveryClient
 public class MessageCenterApplication {
 public static void main(String[] args) {
 SpringApplication.run(MessageCenterApplication.class, args);
 }
 }- Spring Cloud配置文件bootstrap.yml - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13- eureka: 
 client:
 service-url:
 defaultZone: http://localhost:8000/eureka
 spring:
 cloud:
 config:
 discovery:
 enabled: true
 service-id: config-center
 # 资源文件名
 profile: dev
 name: rocketmq- SmsSendTest:单元测试类 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18- @SpringBootTest(classes = MessageCenterApplication.class) 
 @RunWith(SpringJUnit4ClassRunner.class)
 public class SmsSendTest {
 @Resource
 private RocketMQTemplate rocketMQTemplate;
 @Value(MessageConstant.Topic.SMS_TOPIC_TEMPLATE)
 private String smsTopic;
 @Test
 public void sendSms() {
 SmsMessage smsMessage = new SmsMessage();
 smsMessage.setToUserId("13211")
 .setMobile("173333222")
 .setContent("测试短信消息")
 .setSystem(MessageConstant.System.QUESTION);
 rocketMQTemplate.send(smsTopic, MessageBuilder.withPayload(smsMessage).build());
 }
 }
 
- 配置中心(config-server)- 主程序ConfigServerApplication - 1 
 2
 3
 4
 5
 6
 7
 8- @SpringBootApplication 
 @EnableDiscoveryClient
 @EnableConfigServer
 public class ConfigServerApplication {
 public static void main(String[] args) {
 SpringApplication.run(ConfigServerApplication.class, args);
 }
 }- Spring Cloud配置文件bootstrap.yml: - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15- spring: 
 cloud:
 config:
 server:
 git:
 uri: https://gitee.com/Wilson-He/rocketmq-message-center-demo.git
 username: Wilson-He
 force-pull: true
 password:
 # 配置文件在uri下的目录
 search-paths: /config-server-properties
 eureka:
 client:
 service-url:
 defaultZone: http://localhost:8000/eureka- 配置文件configs-server-properties/rocketmq-dev.properties: - 1 
 2
 3
 4
 5
 6
 7- rocketmq.name-server=127.0.0.1:9876 
 rocketmq.topic.sms=sms-topic
 rocketmq.producer.group.sms=sms-group
 rocketmq.consumer.group.sms=sms-group
 rocketmq.topic.mail=mail-topic
 rocketmq.producer.group.mail=mail-group
 rocketmq.consumer.group.mail=mail-group
运行流程
- 运行RocketMQ name-server与broker,如mqnamesrv -n 127.0.0.1:9876,mqbroker -n 127.0.0.1:9876
- 运行eureka应用
- 运行配置中心config-server
- 运行消息中心message-center
- 运行message-center单元测试类(SmsSendTest)或运行question-app访问localhost:8080/question/toUser?userId=xxx进行消费测试,消息中心控制台打印出日志信息与Mongo sms_message_log成功新增了数据即项目搭建完成  
(待)扩展点:
- RocketMQ的发送者应用可在配置文件中设置rocketmq.producer.retry-times-when-send-failed/retry-times-when-send-async-failed属性配置rocketmq同步/异步发送消息失败后的重试次数,不设置则默认都为2
- 当业务执行操作结果失败时仍然入库的原因是有时业务执行过程中可能会包含调用第三方的操作,当第三方报错时会导致业务操作结果失败,而第三方的操作是不可控的,所以先把报错结果保存便于追溯,且有业务需要时也可通过定时任务查库重新执行业务
- 该例子中只用了一个消息配置文件,实际开发中消息配置需根据项目所需配置到对应的项目配置文件,如question-app的消息配置(如topc、producerGroup)应在其项目中的配置文件(如application.yml、apollo的namespace)中配置
- 该项目中的NameServer、Broker并没有集群部署,Broker集群部署后配置同步双写避免主机写入后尚未同步到从机就宕机导致消息丢失的情况(有意向的自行百度:RocketMQ 同步双写)
末
该文章通过一个简单的项目例子演示了使用Spring Boot RocketMQ处理MQ常见问题的一些方式:
- 消息重复消费问题可通过数据库存储来保证幂等性
- 若消息消费业务操作失败时可通过Listener抛出异常让RocketMQ重新投递消息进行消费
 
        