什么是队列(queue)?
队列是一种存储、组织数据的数据结构,最大的特点就是先进先出(FIFO)。
什么是消息队列?
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信);
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯 (异步通信)。
而容器的一个具体实现就是MQ(Message Queue)。
可以通过小红和小明读书的故事来理解一下消息队列。
什么是RabbitMQ?
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言编写,Erlang语言是为电话交换机开发的语言,天生自带高并发光环,和高可用特性。
rabbitmq模拟器:http://tryrabbitmq.com/
,可以直观的理解消息的传递方式。
使用方法:
1、直接将画框左面的图标拖进画图区,构建想要的拓扑图;
2、按住ALT或SHIFT键,鼠标点击需要连接的图标(如果不能连接,试试相反方向,例如先点击queue再点击exchange) ;
3、双击击图标进行编辑设置各项功能;
4、binding key不在queue上设定,双击击连线中间bindingkey。
什么是AMQP协议?
全称为Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议。消息发送与接受的双方遵守这个协议可以实现异步通讯,这个协议约定了消息的格式和工作方式。
为什么选择RabbitMQ?
现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?
- 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
- 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
- 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
- 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
- 社区活跃度高,根据网上资料来看,RabbitMQ也是首选。
RabbitMQ的流程与原理
角色
生产者:消息的创建者,负责创建和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理人:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。
名词解释
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道,1个或多个;
Broker(代理人):即RabbitMQ Server,不是集群,就是1个,否则就是多个;
Virtual host(虚拟机):简称vhost,1个或多个,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue
Exchange(交换器):用于接受、分配消息,1个或多个;
Queue(队列):用于存储生产者的消息,1个或多个;
RoutingKey(路由键):用于把生产者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上。
个人理解RoutingKey和BindingKey是指同一个关系,只不过是站在消费者和和生产者的角度,叫法不一样,取名不一样。
原理
1、建立信道(Channel)
首先必须连接到RabbitMQ才能发布和消费消息,那怎么连接和发送消息的呢?
我们的应用程序和Broker(RabbitMQ Server)之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是我们试图连接RabbitMQ之前发送的RabbitMQ服务器连接信息、用户名和密码,有点像程序连接数据库,一旦认证通过你的应用程序和RabbitMQ就创建了一条AMQP信道(Channel)。
信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。
连接管理器管理着所有的信道。
2、创建/定位虚拟机(vhost)
默认vhost为/,在建立连接时将作为参数传递给Broker(可以理解就是RabbitMQ本身),Broker会帮助我们创建好vhost,每个RabbitMQ可以创建多个vhost,每个vhost可以理解成一个mini版的RabbitMQ,个人觉得也可以把vhost理解成一个命名空间。
3、创建/定位队列(Queue),存放消息
如上图所示,创建队列1(名称为Q1),创建完队列后,会将队列1(Q1)与交换机1(E1)建议绑定关系(E1Q1),这个关系名称我们称之为BindingKey。
到了这一步,生产者可以通过BindingKey(E1Q1)找到目标队列(Q1),将消息存入目标队列中(Q1)。
4、监听消息队列,消费消息
如上图所示,消费者1通过建立信道,提供RoutingKey(E1Q1),即可监听到目标队列(Q1),处理生产者1发送的消息。
RabbitMQ的消息转发类型(Exchange type)
如果把RabbitMQ比作一个传递消息的游戏,那么以上提到角色和名词都是游戏参与者,为了让游戏变得多样有趣(满足不同需求),RabbitMQ还有几个游戏规则,就是我们要说的消息转发类型。
有3种玩法:
1、发布与订阅(Direct exchange)
推荐使用这种,原理是通过消息中的routing key,与binding中的binding-key进行比对,若二者匹配,则将消息发送到这个消息队列或者获取消息。
我们用上面提到的rabbitmq模拟器来示意一下:
2、广播(Fanout exchange)
复制分发路由
3、主题(topic exchange)
比如设置BindingKey为*.Q1.*
,那么E1.Q1.C1
就会匹配上,消息就发送给了消费者1,如下图:
怎样保证RabbitMQ的可靠性
在默认的情况下,重启服务器会导致消息队列丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。
当我们把消息发送到Rabbit服务器的时候,需要选择是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:
- 投递消息的时候durable设置为true,消息持久化,代码:
channel.queueDeclare(x, true, false, false, null)
,参数2设置为true持久化;
设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x,MessageProperties.PERSISTENT_TEXT_PLAIN,x)
,参数3设置为存储纯文本到磁盘; - 消息已经到达持久化交换器上;
- 消息已经到达持久化的队列。
持久化工作原理
Rabbit会将我们的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。
持久化的缺点
消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
所以我们要根据业务场景,选择适合自己的方式。
崩溃处理
再回到上面的流程示例图,图中标有1、2、3数字标示,下面我们来分析一下如果故障发生在1、2、3处,RabbitMQ有什么解决方案?
1、如图数字1处为交换器1,当生产者1把消息传给交换器1时,由于网络原因,发生了丢包怎么处理?
答:如何知道消息有没有正确到达交换器呢?
(1)、通过AMQP提供的事务机制实现
(2)、通过生产者消息确认机制(publisher confirm)实现
2、如图数字2处为队列1,当生产者1把消息已经传到了队列1,服务器断电时,消息丢失了怎么处理?
答:将保存在内存中的数据都写入磁盘,防止服务器重启后数据丢失。
有哪些数据需要持久化保存呢?
元数据、消息需要持久化到磁盘;
磁盘节点:持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清除;
内存节点:非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。
3、如图数字3处为信道3,当消费者1把消息从队列1中取出,消息还没来得及处理就宕机丢失了怎么处理?
答:为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。
如果一个Queue没被任何的Consumer Subscribe(订阅),当有数据到达时,这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer。这个数据被Consumer正确收到时,这个数据就被从Queue中删除。
那么什么是正确收到呢?通过ACK。每个Message都要被acknowledged(确认,ACK)。我们可以显示的在程序中去ACK,也可以自动的ACK。如果有数据没有被ACK,那么RabbitMQ Server会把这个信息发送到下一个Consumer。
消息队列的应用场景有哪些?
1、异步处理
非核心流程异步化,提高系统响应性能
2、应用解耦
系统不是强耦合,消息接受者可以随意增加,而不需要修改消息发送者的代码。消息发送者的成功不依赖消息接受者(比如有些银行接口不稳定,但调用方并不需要依赖这些接口)
不强依赖于非本系统的核心流程,对于非核心流程,可以放到消息队列中让消息消费者去按需消费,而不影响核心主流程
3、最终一致性
最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。
先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制实现消息可靠发送接收、业务操作的可靠执行,要注意消息重复与幂等设计。
所有不保证100%不丢消息的消息队列,理论上无法实现最终一致性。
4、广播
只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情。
5、流量削峰与流控
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
6、日志处理
将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
7、消息通讯
消息队列一般都内置了高效的通信机制,因此也可以用于单纯的消息通讯,比如实现点对点消息队列或者聊天室等。
个人认为消息队列的主要特点是异步处理,一切不着急马上响应结果的、非阻塞性的业务逻辑,都可以考虑使用消息队列,这样可以减少用户请求的响应时间,提高用户体验,同时也有利于业务模块之间的解耦。
从实际项目出发,整理了如下具体场景:
1、用户注册时:注册完后需要发送注册成功邮件,或发送手机短信,或推荐一些感兴趣的主题或志同道合的人;
2、用户上传存证时:需要生成证书、需要生成PDF文档;
3、用户申购金融产品后:需要发送月报消息;
4、买火车票时:由于并发量太大,按照火车票当天的发行量,设置一个队列长度,队列满则不能再进入修补票队列;
5、双11秒杀商品时:先排队扣库存,再提醒用户付款;
扩展阅读
快递员A需要将一个快递给客户B。
原本:A亲手将快递交给B。
问题:B现在有很多事要做,A只能等着B处理完事情,才能把快递交给他,A就很难受了,送不了几个快递,没法赚钱了。
解决:设置一个快递柜,A把快递放进快递柜,就可以送下一个快递了,B啥时候有空了,去快递柜拿就行了。
总结:快递就是消息,快递柜就是消息队列。快递柜有很多种,菜鸟,丰巢等,RabbitMQ就是其中一种快递柜。
放在程序里举例:
客户提交订单,支付系统——>订单系统,支付系统可能很快就执行完了,但是订单系统要很久才能执行完,每次支付系统都要等待订单系统,服务端速度就会很慢,现在使用RabbitMQ,支付系统支付成功后,发送一个支付成功消息到RabbitMQ,就可以返回前端了,订单系统在获取到消息后,慢慢再执行订单修改的程序。(当然,要考虑到某个系统出了异常怎么办,这个入门先不管,只管正常情况。)
参考:
https://www.zhihu.com/question/34243607
https://zhuanlan.zhihu.com/p/63700605
https://www.cnblogs.com/vipstone/p/9275256.html RabbitMQ系列(二)深入了解RabbitMQ工作原理及简单使用