Rocket-MQ总结

之前一直在使用rabbitmq,对Rocket MQ研究的比较少;最近由于另外一个项目已经使用了RMQ,为了不至于引入更多的中间件类型,在基础架构中也直接选型RMQ。然后大致找了一些资料,和书籍了解了一下RMQ的原理;趁周末,大概总结一下。

MQ作用

既然都聊到消息队列了,就势必再提一下它在分布式中的作用。对我来讲,比较有深入体会的,大致包含一下三个方面。

  • 架构解耦

    从解耦的角度来讲,再各个微服务架构设计中还是比较常见的。比如,使用消息对接,可以将一个既处理API操作,又实现业务逻辑的单体架构,拆解为两个微服务;其中,API controller只负责接受消息,记录到数据库,并发送处理消息到MQ积压。然后专门的operator来去订阅MQ中对应操作的消息并处理业务。

  • 流量消峰

    在上一个例子中,当API controller瞬间接受到大流量时,只需要记录到数据库,并发送消息到MQ;也许operator并没有那么强的应对高峰请求的处理能力,
    这个时候就只能够时间换空间,逐步的从MQ中取出消息去消费。这种情况下,MQ就起到了很重要的流量消峰的作用。

  • 消息分发

    消息分发应该属于MQ都共有的一个特性,一般都支持不同的topic,或者key映射到不同的队列,然后消费者根据自己的喜好去订阅不同类型的消息。说到这一点,感觉rabbitmq比rocket mq在这块的特征要更加突出,因为它维护了一个消费者端的队列,而不是像RMQ一样,逻辑在consumer侧。

架构

Rocket MQ采用较简化的架构来设计,主要由下面四个部分组成:

  • NameServer: 存储broker与topic之间的映射关系,类似于一个metadata服务器;
  • Broker: 真正处理消息积压和分发的地方;
  • Producer: 消息的生产者,主要是以SDK或者库的形势运行在业务端程序上;
  • Consumer: 消息的消费者,同Producer,也是以SDK或者库的形势运行在业务端程序上。

NameServer

  • NameServer是采用内存存储topic与broker的映射关系的,没有集中的存储或者缓存中间件;
  • 所有的其他组件在启动前,都会指定NameServer地址;Broker会与NameServer建立长连接,定期向NamerServer发送心跳;
  • Broker向NameServer发心跳,会带上当前自己所负责的所有Topic信息,相当于每一个心跳TTL周期都在为NameServer做全同步;
  • 由于无状态,NameServer支持多实例部署;当动态添加一个NameServer的时候,需要所有其他组件都感知到,Broker支持动态修改配置;
  • 当查询完topic到broker的map后,Produce与Broker,Broker与Consumer之间都会建立长连接,NameServer全部挂掉后,也不会影响旧业务。

Broker

主备

  • 支持一个主和多个备部署,拥有同样name的broker为一组;在这组中,id为0的为master,其他为slave;
  • master支持producer往上面写消息并支持从上面读消息,slave只支持从上面读消息;
  • broker支持指定采用同步还是异步模式;在同步模式,当producer写消息到master后,master必须将消息写到slave才会返回状态;
  • broker暂不支持主备切换,在同一个组中,如果master挂了,consumer只能够从slaves上消费还未被消费的消息,但producer无法再写入;
  • 创建Topic时,把Topic的多个队列创建在多个Broker组上,这样一个Broker组的Master挂后,其他组的Master仍然可用,Producer仍然可以发送消息。

消息消费模式

  • Clustering模式, 同一个ConsumerGroup(GroupName相同)只会有一个consumer抢到消息并消费;多个ConsumerGroup的consumer可以订阅同一个topic,这样每个ConsumerGroup中都会有一个consumer能消费到消息(即:同一个消息支持被多个ConsumerGroup重复消费)。
  • Broadcasting模式, 同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

    注意:不要混淆了ConsumerGroup和ProducerGroup。

Broker队列

  • 在创建topic的时候,可以指定其存放的队列个数;该队列数为在单台broker上的队列数(当有N个slave,总队列数就对应乘以N倍);
  • 默认情况下,producer会将消息轮询分发到master的所有队列上;

消息存储

  • 消息存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件;
  • broker支持指定同步落盘还是异步落盘;同步落盘会保障消息被写到磁盘才返回,异步是先记录到内存就会返回状态,当内存消息达到一定量后再写入物理存储;

消息顺序

  • 如果需要支持全局顺序消息,就必须为topic只指定一个队列,否则多个队列无法保障消息的顺序;
  • 需要只需要保障部分顺序消息,需要使用MessageQueue-Selector来将消息发送到指定队列上,消费端也需要使用MessageListenerOrderly保障不会并发读取。

Producer

发送消息

  • 支持发送延迟消息,当指定了消息类型为延迟消息后,该消息发送到broker,需要等待延迟时间到后,才能够生效,被consumer消费(支持指定时刻和指定延迟间隔);
  • 一个topic有多个队列,默认配置下,producer会轮流往这些队列发送该topic的消息,从而实现负载均衡;
  • 可以设置Message-QueueSelector来自定义消息放到哪些queue里面;

事务消息

采用两阶段提交的方式实现事务消息;

  • Producer先发送一个待确定消息到broker,此时该消息无法被consumer看到,但是该操作会返回消息在队列中的地址等信息,作为索引;
  • Producer在本地处理完操作之后,基于第一阶段broker返回的消息索引,发送第二个确认消息到broker(可能为commit或者rollback);
  • broker如果收到commit消息,将让consumer去消费该消息;如果收到rollback,将消息处理掉即可。
  • 如果producer在发送第二个确认消息到broker的途中异常,消息没有送达broker;broker会定期扫描队列中未被确认的消息,并回调producer的处理函数处理;

事务消息处理流程

消息去重

  • 保障消息消费逻辑的幂等性;
  • 维护消息消费的记录,每次消费的时候去查询是否被消费过(麻烦)。

Consumer

两种方式

  • push方式,其实是通过长轮询来实现的,在该类型下由consumer维护了maptree,保存所有接收到的消息的列表,并实现类似于时间窗口的算法来做流控;PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外 ProcessQueue还可以辅助实现顺序消费的逻辑。push方式当退出consume时,必须显示调用shutdown()告知broker记录offset等信息;
  • pull方式,需要自己维护从broker的队列上读取消息的offset,并存储到本地,从而保障当consumer挂掉重启之后能够重新接着源地址消费消息;
  • 从原理来讲,pull方式应该是不支持brodcast消息方式的,除非broker通过session获取到连接到其上的所有consumer,并为未消费的consumer保留消息等待消费。

负载均衡

  • consumer自己获取到全局queue信息后,并在客户端实现负载均衡;

消费失败

  • Consumer只负责消费消息,当消费消息失败的时候,会将消息写回broker,并设置其topic为SCHEDULE_TOPIC_XXXX(同样会做落盘处理);
  • scheduler会轮询读取队列中的消息,查看是否到达消费时间;
  • 当时间到后,将其放入retry队列,开始重新消费。

    这篇文章对该流程有消息的介绍

0%