2021最新版RabbitMQ完整教程学习笔记

  heardfate

文章目录
一、中间件
1. 什么是中间件
2. 中间件技术及架构的概述
3. 基于消息中间件的分布式系统的架构
4. 消息队列协议
5. 消息队列持久化
6. 消息的分发策略
7. 消息队列高可用和高可靠
二、入门及安装 RabbitMQ
1. RabbitMQ入门及安装
概述
下载RabbitMQ
安装Erlang
安装socat
安装rabbitmq
2. RabbitMQWeb管理界面及授权操作
RabbitMQ管理界面
授权账号和密码
3. RabbitMQ之Docker安装
Dokcer 安装 RabbitMQ
小结
4. RabbitMQ的角色分类
RabbitMQ的角色分类
通过管理界面授权
三、入门案例
1. RabbitMQ入门案例 - Simple 简单模式
项目准备
演示代码
2. 什么是AMQP
什么是AMQP
AMQP生产者流转过程
AMQP消费者流转过程
3. RabbitMQ的核心组成部分
RabbitMQ的核心组成部分
RabbitMQ整体架构是什么样子的?
RabbitMQ的运行流程
RabbitMQ支持的消息模型
web管理界面菜单含义
4. RabbitMQ入门案例 - fanout 模式(发布与订阅模式)
web界面端实现
代码实现
5. RabbitMQ入门案例 - Direct 模式
web管理端实现
代码实现
6. RabbitMQ入门案例 - Topic 模式
web管理端实现
代码实线
小结:完整的声明式创建方式
7. RabbitMQ入门案例 - Work模式
Work模式轮询模式(Round-Robin)
Work模式公平分发模式
内存、磁盘空间设置
8.RabbitMQ入门案例 - Headers模式
web管理端实现
9. RabbitMQ使用场景
解耦、削峰、异步
高内聚,低耦合
四、Springboot 整合案例
项目准备
1. Fanout 模式
2. Direct 模式
3. Topic 模式(采用注解配置)
小结
五、RabbitMQ高级
1. 过期时间TTL
概述
设置队列TTL
设置消息TTL
2. 死信队列
概述
3. 内存磁盘的监控
RabbitMQ内存警告
RabbitMQ的内存控制
RabbitMQ的内存换页
RabbitMQ的磁盘控制
4.集群
RabbitMQ集群
单机多实例搭建
5.分布式事务
分布式事务的方式
事务问题案例
基于MQ的分布式事务
整体设计思路
可靠生产问题
可靠消费
可靠生产、可靠消费 总结
一、中间件
1. 什么是中间件
什么是中间件

一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。

中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来

为什么需要使用消息中间件

具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担,中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。

中间件特点

必须遵循一定的规范,具有高可用、高可扩、持久性扥特征,

屏蔽底层操作系统复杂性、屏蔽技术架构局限性(不需要应用程序都要使用同一个语言)

中间件应具有如下的一些特点:

满足大量应用的需要
运行于多种硬件和 OS平台
支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
支持标准的协议
支持标准的接口
TCP/IP协议是中间的主要通信协议,但他比较底层并不能完全满足需求,我们还要基于TCP/IP构建自己的请求信息。

在项目中什么时候使用中间件技术

在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑。

但是作为一个开发人员,一定要有学习中间件技术的能力和思维。

  1. 中间件技术及架构的概述

分布式消息中间件:

ActiveMQ:比较老牌,Java语言开发,高性能,现在选型比较少了,略复杂
RabbitMQ:比较流行,支持模式完善、Spring的支持很完善
Kafka:性能最高,最接近的底层,不支持事务,支持持久化、分发机制,大数据领域应用多,开源
RocketMQ:阿里开发,谨慎对待,目前在Apache托管,将来是否继续维护还不确定
为什么消息中间件还有自己的协议?

因为TCP/IP协议比较底层,无法完全满足我们的需求,所以在TCP/IP协议之上构建了自己的协议,底层还是TCP/IP

负载均衡中间:

Nginx 负载均衡
LVS负载均衡:对Nginx进行集群
KeepAlive:保持心跳
CDN:加速技术
缓存中间件:
MemCache:适合小规模缓存使用
Redis:适合大规模缓存使用

数据库中间件:

MySQL可以持久化,但高可用能力不强,而且数据库自身的优化已经达到瓶颈之后,就需要中间件来解决
MyCat:解决数据库高可用,
ShardingJDBC:
学习中间件的方式和技巧

理解中间件在项目架构中的作用,以及各中间件的底层实现
可以使用一些类比的生活概念去理解中间件
使用一些流程图或者脑图的方式去梳理各个中间件在架构中的作用
尝试用 java技术去实现中间件的原理
静下来去思考中间件在项目中设计的和使用的原因
如果找到对应的代替总结方案
尝试编写博文总结类同中间件技术的对比和使用场景
学会查看中间件的源码以及开源项目和博文
单体架构

在企业开发当中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个工程中,如果其中的一个模块升级或者迭代发生一个很小的变动都会重新编译和重新部署项目。这种架构存在的问题是:

耦合度太高
不易维护
服务器的成本高
以及升级架构的复杂度也会增大
这样就有后续的分布式架构系统。如下

分布式架构

何谓分布式系统:

通俗一点:就是一个请求由服务器端的多个服务(服务或者系统)协同处理完成

和单体架构不同的是,单体架构是一个请求发起 jvm调度线程(确切的是 tomcat线程池)分配线程 Thread来处理请求直到释放,而分布式系统是:一个请求时由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就像建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展到后期的时候要去部署和思考的问题。我们也不难看出来:分布式架构系统存在的特点和问题如下:

存在问题:

学习成本高,技术栈过多
运维成本和服务器成本增高
人员的成本也会增高
项目的负载度也会上升
面临的错误和容错性也会成倍增加
占用的服务器端口和通讯的选择的成本高
安全性的考虑和因素逼迫可能选择 RMI/MQ相关的服务器端通讯
好处:

服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资源,不造成服务器资源的浪费
系统的独立维护和部署,耦合度降低,可插拔性
系统的架构和技术栈的选择可以变的灵活(而不是单纯地选择 java)
弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态
3. 基于消息中间件的分布式系统的架构

从上图中可以看出来,消息中间件的是

利用可靠的消息传递机制进行系统和系统直接的通讯
通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯(不受语言限制)
消息中间件应用的场景

跨系统数据传递
高并发的流量削峰
数据的并发和异步处理
大数据分析与传递
分布式事务比如你有一个数据要进行迁移或者请求并发过多的时候,
比如你有10 W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行

串行的总时间为所有系统运行的时间之和;

并行的运行的总时间取决于最慢的那个系统的运行时间,比串行的求和要快很多;

常见的消息中间件

ActiveMQ、RabbitMQ、Kafka、RocketMQ等

消息中间件的本质及设计

它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务

MQ消息队列:负责数据的接受,存储和传递,所以性能要高于普通服务和技术

消息中间件的核心组成部分

消息的协议
消息的持久化机制
消息的分发策略
消息的高可用,高可靠
消息的容错机制
小结

其实不论选择单体架构还是分布式架构都是项目开发的一个阶段,在什么阶段选择合适的架构方式,而不能盲目追求,最后造成的后果和问题都需要自己买单。但作为一个开发人员学习和探讨新的技术使我们每个程序开发者都应该去保持和思考的问题。当我们没办法去改变社会和世界的时候,我们为了生活和生存那就必须要迎合企业和市场的需求,发挥你的价值和所学的才能,创造价值和实现自我

  1. 消息队列协议
    什么是协议

所谓协议是指:

计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流
和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高
协议对数据格式和计算机之间交换数据都必须严格遵守规范
网络协议的三要素

语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
时序:时序是对事件发生顺序的详细说明
比如我 MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的相应结构和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的 http请求协议:

语法:http规定了请求报文和响应报文的格式
语义:客户端主动发起请求称之为请求(这是一种定义,同时你发起的是 post/get请求)
时序:一个请求对应一个响应(一定先有请求再有响应,这个是时序)
而消息中间件采用的并不是 http协议,常见的消息中间件协议基于TCP/IP协议之上封装成:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议

面试题:为什么消息中间件不直接使用 http协议

因为 http请求报文头和响应报文头是比较复杂的,包含了Cookie,数据的加密解密,窗台吗,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速
大部分情况下 http大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行
AMQP协议

AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等

特性:

分布式事务支持
消息的持久化支持
高性能和高可靠的消息处理优势

MQTT协议

MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分

特点:

轻量
结构简单
传输快,不支持事务
没有持久化设计
应用场景:

适用于计算能力有限
低带宽
网络不稳定的场景

OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准

特点:

结构简单
解析速度快
支持事务和持久化设计
Kafka协议

Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成

特点:

结构简单
解析速度快,性能最高
无事务支持
有持久化设计
小结

协议:

其实就是 TCP/IP 协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能

  1. 消息队列持久化
    持久化

简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存

常见的持久化方式

  1. 消息的分发策略
    消息的分发策略

MQ消息 队列有如下几个角色

生产者
存储消息
消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?

一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的 git就有推拉机制,我们发送的 http请求就是一种典型的拉取数据库数据返回的过程。

而消息队列 MQ是一种推送的过程,而这些推机制会使用到很多的业务场景也有很多对应推机制策略

场景分析一

比如我在 APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被哪个系统或者哪些服务器或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论

场景分析二

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费 MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发

消息分发策略的机制和对比

轮询分发侧重公平性,不会因为消费处理的快与慢而对数据分发有倾斜,对应自动应答,自动ACK

公平分发侧重能者多劳,会根据实际的使用情况有一定的倾斜,对应手动应答,手动ACK

重发为了保证消息的可靠性

消息队列很少使用消息拉取操作

综合来看,Rabbit的功能最完善,最稳定;kafka性能最高,

  1. 消息队列高可用和高可靠
    什么是高可用机制

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力

当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的

说白了,就是尽可能做到,有服务器出故障、宕机之后系统也能正常使用

集群模式1 - Master-slave主从共享数据的部署方式

集群模式2 - Master-slave主从同步部署方式

解释:这种模式写入消息同样在 Master主节点上,但是主节点会同步数据到 slave节点形成副本,和 zookeeper或者 redis主从机制很雷同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点进行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后续的 rabbitmq中会有使用

所以要尽量部署在同一个机房内,保证带宽不受影响,
集群模式3 - 多主集群同步部署模式

解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入

集群模式4 - 多主集群转发部署模式

解释:如果你插入的数据是 broker-1中国,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息也就是元数据信息进行同步,如果消费者在 broker-2中进行消费,发现自己节点没有对应的信息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他回去联系其他的黄牛询问,如果有就返回

集群模式5 Master-slave与 Broker-cluster组合的方案

解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高

什么是高可靠机制

所谓高可靠是指:系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠

在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的

如何保证中间件消息的可靠性呢,可以从两个方面考虑:

消息的传输:通过协议来保证系统间数据解析的正确性
消息的存储区可靠:通过持久化来保证消息的可靠性
二、入门及安装 RabbitMQ
1. RabbitMQ入门及安装
概述
简单概述:

RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征

下载RabbitMQ
下载地址:https://www.rabbitmq.com/download.html
环境准备:CentOS7.x + /Erlang

我使用的虚拟机是Linux centos 7,因此找到对应的版本下载,这里下载的是rmp安装包,

RabbitMQ是采用 Erlang语言开发的,erlang语言是基于开发交换机的语言,性能高

所以系统环境必须提供 Erlang环境,第一步就是安装 Erlang

安装Erlang
查看系统版本号,rabbitmq 对 erlang 有版本要求,不能使用太旧的erlang版本

https://www.rabbitmq.com/which-erlang.html

比如,rabbitmq 的最新版为 3.8.14,他要求 erlang 的最小版本为 22.3

这里我们将 rabbitmq 和 erlang 安装包提前准备好

安装包:

创建一个目录

mkdir -p /usr/rabbitmq
1
将 rabbitmq 和 erlang 包上传到这个目录中

解压 erlang 语言包

rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
1
解压完毕,开始安装 erlang

yum install -y erlang
1
安装需要等待一点时间,安装完毕后,检测erlang版本,出现版本号表示安装成

erl -v
1

安装socat
rabbitmq 在安装过程中需要依赖这个插件,需要先安装

yum install -y socat
1

安装rabbitmq
解压 rabbitmq 安装包,注意实际的包名要以我们自己下载的包名

rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm
1
解压完毕,开始安装

yum install rabbitmq-server -y
1
安装完毕,启动服务

启动服务

systemctl start rabbitmq-server

查看服务状态,running表示启动成功

systemctl status rabbitmq-server.service

开机自启动

systemctl enable rabbitmq-server

停止服务

systemctl stop rabbitmq-server
1
2
3
4
5
6
7
8

  1. RabbitMQWeb管理界面及授权操作
    RabbitMQ管理界面
    默认情况下,是没有安装web端的客户端插件,需要安装才可以生效

执行命令,开始安装rabbitmq管理界面插件

rabbitmq-plugins enable rabbitmq_management
1

安装完毕以后,重启服务

systemctl restart rabbitmq-server
1
访问浏览器,访问地址:服务器 IP+端口号(默认15672)

注意:

在对应服务器(阿里云,腾讯云等)的安全组中开放15672端口(rabbitmq默认端口号),5672端口后续程序需要使用也要开放
rabbitmq有一个默认账号和密码都是:guest默认情况只能在 localhost本计下访问,所以需要添加一个远程登录的用户
授权账号和密码
新增用户,账号 admin,密码 admin

rabbitmqctl add_user admin admin
1

设置用户分配操作权限

rabbitmqctl set_user_tags admin administrator
1

用户操作权限分四种级别:

administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
monitoring:监控者 登录控制台,查看所有信息
policymaker:策略制定者 登录控制台,指定策略
managment 普通管理员 登录控制台
为用户添加资源权限(授予访问虚拟机根节点的所有资源,如果已经选择了admin,那么这个命令可以不执行)

rabbitmqctl set_permissions -p / admin “.*”“.*”“.*”
1
网页登录成功

其他账户操作命令

添加账号、密码

rabbitmqctl add_user

设置账号为管理员

rabbitmqctl set_user_tags 账号 administrator

修改账号密码

rabbitmqctl change_password Username Newpassword

查看用户清单

rabbitmqctl list_users

添加账号查看资源的权限

rabbitmqctl set_permissions -p / 用户名 “.*”“.*”“.*”
1
2
3
4
5
6
7
8
9
10
3. RabbitMQ之Docker安装
Dokcer 安装 RabbitMQ
首先要在服务器中准备一个 Docker 的环境

虚拟化容器技术 - Docker的安装

yum 包更新到最新

yum update

安装docker依赖的组件,yum-utils提供yum-config-manager功能,另外两个是devicemapper驱动依赖

yum install -y yum-utils device-mapper-persistent-data lvm2

设置yum源为阿里云

yum-config-manager –add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

安装docker

yum install docker-ce -y

安装完毕后,检查docker版本

docker -v

给docker配置阿里云镜像加速器(可以不安装,看个人情况)

sudo mkdir -p /erc/docker
sudo tee /etc/docker/daemon.json <<-‘EOF’
{
“registry mirrors”: [“https://0wrdwnn6.mirror.aliyuncs.com”]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
docker的相关命令

启动docker

systemctl start docker

停止docker

systemctl stop docker

重启docker

systemctl restart docker

查看docker状态

systemctl status docker

查看当前有哪些镜像

docker images

开机自启动

systemctl enable docker
systemctl unenable docker

查看docker概要信息

docker info

查看docker帮助文档

docker –help
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
安装 rabbitmq 方法一

获取rabbit镜像

docker pull rabbitmq:management
1
2

创建并运行容器

docker run -id –name=myrabbit -p 15672:15672 rabbitmq:management

参数含义

–hostname:指定容器主机名称
–name:指定容器名称
-p:将mq端口号映射到本地
1
2
3
4
5
6
安装完毕之后,还要再单独设置指定账户、密码

我们可以从官网查找到命令,指定在安装的过程中就可以根据提示完成账户、密码的设置

安装 rabbitmq 方法二

访问:https://registry.hub.docker.com/_/rabbitmq/

找到以下文档位置

执行docker安装rabbitmq的命令,图中命令与方法一的命令整合

docker run -di –name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
1

显示启动错误是因为,安装成功后会尝试启动,上面我们手动安装rabbit已经占用端口,所以需要先将rabbitmq关闭,再用docker启动

查看docker容器状态

docker ps -a
1
镜像当前处于创建状态下,还没有启动

根据容器ID启动rabbitmq

docker start 13493aa7dbb8
1
启动成功

再次访问浏览器,这次访问的是 Docker 启动的 RabbitMQ

小结
Docker安装RabbitMQ明显比手动安装更加简洁,省去了 erlang、socat、管理界面插件等安装过程,甚至安装时就已经设置好了账户、密码,直接一步到位

注意端口的使用,一定要提前开放好

  1. RabbitMQ的角色分类
    RabbitMQ的角色分类
    none:

不能访问mamanement plugin
management:查看自己相关节点信息

列出自己可以通过AMQP登入的虚拟机
查看自己的虚拟机节点 virtual hosts的queues,exchanges和bingdings信息,查看和关闭自己的channels和connections信息
查看有关自己的虚拟机节点 virtual hosts统计信息,包括其他用户在这个节点 virtual hosts中的活动信息
Policymaker:

包含management所有权限
查看和创建和删除自己的virtual hosts所属的policies和parameters信息
Monitoring:相当于普通管理员

包含management所有权限
罗列出所有的virtual hosts,包括不能登录的virtual hosts
查看其他用户的connections和channels信息
查看节点级别的数据如clustering何memory使用情况
查看所有的virtual hosts的全局统计信息
Administrator:超级管理员,(学习经常使用)

最高权限
可以创建和删除 virtual hosts
可以查看,创建、删除users
查看、创建permissions
关闭所有用户的connections
通过管理界面授权

三、入门案例
参考官网快速启动:https://www.rabbitmq.com/getstarted.html

1.2.3.4.5重点掌握

jdk1.8
构建一个 maven工程
导入 rabbitmq 的 maven依赖
启动 rabbitmq-server服务
定义生产者
定义消费者
观察消息的在 rabbitmq-server 服务中的进程
1. RabbitMQ入门案例 - Simple 简单模式
项目准备
构建一个maven工程,添加rabbitmq依赖

先使用原生 rabbitmq 依赖,后面再整合 Spring


com.rabbitmq
amqp-client
5.10.0

1
2
3
4
5

演示代码

在上图的模型中,有以下概念:

生产者:也就是要发送消息的程序
消费者:消息的接受者,会一直等待消息到来。
消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者

// 简单模式
public class Producer {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(“10.15.0.9”);
connectionFactory.setPort(5672);
connectionFactory.setUsername(“admin”);
connectionFactory.setPassword(“admin”);
connectionFactory.setVirtualHost(“/”);
Connection connection = null;
Channel channel = null;
try {
// 2.创建链接
connection = connectionFactory.newConnection(“生产者”);
// 3.获取连接通道
channel = connection.createChannel();
// 4.通过创建交换机,声明队列,绑定关系,路由key,发送消息和接受消息
/*
参数1:队列名称
参数2:是否持久化,非持久化消息会存盘吗?会存盘,但是会随着重启服务器而丢失
参数3:是否独占队列
参数4:是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
参数5:携带附属属性
/
String queueName = “queue1”;
channel.queueDeclare(queueName,false,false,false,null);
// 5.发送消息给队列queue,都是由channel来处理
/

参数1:交换机
参数2:队列、路由key
参数3:消息的状态控制
参数4:消息主题
*/
//面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机
String message = “Hello”;
channel.basicPublish("", queueName, null,message.getBytes());
System.out.println(“消息发送成功”);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
// 6.关闭通道,注意关闭顺序
channel.close();
// 7.关闭连接
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

        }
    }
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
启动成功后,查看管理面板,可以看到我们创建的队列,有一条消息没有被消费

随着最后一条消息被消费,非持久化的消息会被删除,持久化消息会被保存

消费者

public class Consumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(“10.15.0.9”);
connectionFactory.setPort(5672);
connectionFactory.setUsername(“admin”);
connectionFactory.setPassword(“admin”);
connectionFactory.setVirtualHost(“/”);
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection(“生产者”);
channel = connection.createChannel();
// 确保和消费者的队列名一致才可以接收消息
String queueName = “queue1”;
channel.queueDeclare(queueName,false,false,false,null);
// 接收消息,必须重写两个方法,消息的处理和异常情况的处理
channel.basicConsume(“queue1”, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(“消息已接收”+ new String(delivery.getBody(), “UTF-8”));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(“接收消息失败”);
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
// 6.关闭通道,注意关闭顺序
channel.close();
// 7.关闭连接
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

        }
    }
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
启动运行,消费者成功接收消息,查看管理面板,之前存在的一条消息已经被消费

小结:

持久化队列可以持久存在,在最后一条消息被消费之后,持久话队列仍然存在,非持久化队列会被删除
重启服务,持久化队列仍然存在
持久化队列会存盘,非持久化队列也会存盘,但是非持久化队列会随着服务器的重启而丢失
2. 什么是AMQP
RabbitMQ的运行遵循AMQP协议

什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计

AMQP生产者流转过程

为什么通信使用的是通道而不是连接?

这里的连接属于短连接,连接的创建与消费比较耗费性能
通道、信道属于长连接,处理消息性能更高
AMQP消费者流转过程

ACK是应答,分手动ACK、自动ACK,实际生产中一般使用手动ACK

  1. RabbitMQ的核心组成部分
    RabbitMQ的核心组成部分

核心概念:

Server:又称 Broker ,接受客户端的连接,实现AMQP实体服务,安装 rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP 协议实现三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立多个Channel,每个Channel的代表一个对话任务
Message:消息,服务与应用程序之间传送的数据,有Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟的等高级属性,Body就是消息的具体内容
Virtual Host:虚拟机地址,用于逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接收消息,根据路由键发送消息到绑定队列,不具备消息存储的能力
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key
Routing Key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
Queue:队列,也称为Message Queue消息队列,保存消息并将他们转发给消费者
注意:

队列是不能直接接收消息的,一定是通过交换机来进行投递
队列需要指定绑定哪个队列,如果没有指定就是自动绑定默认交换机(AMQP default),推荐自己指定交换机
一个交换机可能会绑定多个队列,如果我们只需要部分队列接收消息,这就需要路由来指定,也就是定一个条件
RabbitMQ整体架构是什么样子的?

同上面类似

RabbitMQ的运行流程

RabbitMQ支持的消息模型

正是由于交换机对消息队列有着不同的推送机制,才产生了多种模式

简单模式 Simple
工作模式 Work
发布订阅模式
路由模式
主题 Topic模式
参数模式,
RPC(属于一种拉取机制,暂时不探讨)
web管理界面菜单含义

overview:反应当前队列的运行状况,条目数,所占用空间,内存状况,
Consumer:消息是否被消费者消费
Bindings:当前队列绑定交换机情况,如果这里没有指定,一定是绑定默认交换机
Publish message:发送的消息情况,持久化还是非持久化,
Get message:获取消息,生产时切记,如果想要预览消息,
不要点击下拉框的ACk,一旦选择这种方式就相当于消息被消费了,会影响实际生产;选择NACK预览不做应答,不会对消息产生影响
Move message:表示消息可以转移到另一个队列中
Purge:清空队列
Delete:删除队列
Runtime Metrics:运行队列的基本状况

在交换机菜单里,我们可以看到交换机列表,其中包括了默认交换机,点击一个交换进入,

选择Publish message我们可以使用管理端页面模拟指定往哪个队列发消息,routing key输入队列名即可
发送消息之后,即可在队列菜单中指定的队列中发现
4. RabbitMQ入门案例 - fanout 模式(发布与订阅模式)

Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式,即使指定了routing-key也没有意义

fanout 发布于订阅模式就好比收听广播

web界面端实现
模拟代码的执行流程,方便我们理解程序的思路

点击Exchange菜单,创建一个 exchange,填写交换机参数,确认添加

点击Queues菜单,创建两个队列用于接收消息queue2,queue3

点击进入刚创建的队列,当前使用了默认绑定交换机,修改绑定交换机为我们创建的 fanout-exchange

查看交换机也可以看到,对应已经绑定了队列

在交换机的菜单中,选择我们床架你的交换机,发送消息,publish message,fanout模式不需要指定队列,发送即可推送到自己所绑定的全部队列中

选择一个队列点击进入,预览消息

代码实现
生产者

// fanout模式
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(“10.15.0.9”);
connectionFactory.setPort(5672);
connectionFactory.setUsername(“admin”);
connectionFactory.setPassword(“admin”);
connectionFactory.setVirtualHost(“/”);
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection(“生产者”);
channel = connection.createChannel();
// fanout 无需指定队列和路由
// 发送的消息内容
String message = “Hello fanout 代码测试”;
String exchangeName = “fanout-exchange”;
String routeKey = "";
String type = “fanout”;
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println(“消息发送成功”);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
// 6.关闭通道,注意关闭顺序
channel.close();
// 7.关闭连接
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

        }
    }
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
消费者

public class Consumer {
// 这里我们直接使用多线程模拟多个消费者
public static void main(String[] args) {
new Thread(runnable, “queue1”).start();
new Thread(runnable, “queue2”).start();
new Thread(runnable, “queue3”).start();
}
private static Runnable runnable = new Runnable() {
@Override
public void run() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(“10.15.0.9”);
connectionFactory.setPort(5672);
connectionFactory.setUsername(“admin”);
connectionFactory.setPassword(“admin”);
connectionFactory.setVirtualHost(“/”);
Connection connection = null;
Channel channel = null;
final String queueName = Thread.currentThread().getName();
try {
connection = connectionFactory.newConnection(“生产者”);
channel = connection.createChannel();
// 这里我们并没有做交换机与度列的绑定,因为在web管理端已经绑定好了
// 所以以后在实际生产中也可以使用这种图形+代码相结合的方式
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName+" :消息已接收,"+ new String(delivery.getBody(), “UTF-8”));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(“接收消息失败”);
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

            }
        }
    }
};

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
启动测试,先生产消息,在消费消息,由于我们在web管理端设置的fanout-exchange绑定了queue2和queue3

需要注意的是:

此处没有通过代码去绑定交换机和队列,而是通过可视化界面去绑定的!简化了简化了代码
这也说明工作我们可以现在界面中建立好绑定关系,代码中直接使用,提高效率
5. RabbitMQ入门案例 - Direct 模式
实际上,路由模式就是在发布与订阅模式基础上增加了一个routing-key,基于routing-key进行选择队列发送消息

web管理端实现
创建交换机,选择direct类型

添加绑定队列,自己定义一个key

使用交换机发送消息,比如我们想指定发送消息,路由键为email的队列可以接收,也就是queue1

查看队列可以看到,拥有路由键email的queue1接收到了消息,只要队列满足路由键即可接收消息

代码实现
在上面 fanout 模式的基础上,修改交换机名字,交换机类型,增加路由键的的绑定

//6.定义路由key
String routeKey = “email”;
//7.指定交换机的类型
String type = “direct”;
channel.basicPublish(exchangeName,routeKey, null,message.getBytes());
1
2
3
4
5
6. RabbitMQ入门案例 - Topic 模式
主题模式其实就是在fanout和direct模式的基础上进一步叠加,提供可以支持模糊匹配的路由routing-key

web管理端实现
创建交换机,选择topic类型

添加绑定队列,定义模糊匹配的routing-key,

*必须匹配一个级别,#可以匹配没有也可以匹配一级或多级

发送消息,规定路由键com.course.swy

查看队列,可以看到,queue1和queue2收到了信息

代码实线
定义可以模糊匹配的路由键,指定交换机类型,指定交换机名字

//6.定义路由key
String routeKey = “com.order.test.xxx”;
//7.指定交换机的类型
String type = “topic”;
channel.basicPublish(exchangeName,routeKey, null,message.getBytes());

//5.准备交换机
String exchangeName = "direct_message_exchange";
String exchangeType = "direct";
//如果你用界面把queue和exchange的关系先绑定话,代码就不需要在编写这些声明代码可以让代码变得更简洁
//如果用代码的方式去声明,我们要学习一下
//6.声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失
channel.exchangeDeclare(exchangeName,exchangeType,true);

//7.声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);

//8.绑定队列和交换机的关系
channel.queueBind("queue5",exchangeName,"order");
channel.queueBind("queue6",exchangeName,"order");
channel.queueBind("queue7",exchangeName,"course");

channel.basicPublish(exchangeName,course, null,message.getBytes());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
小结:完整的声明式创建方式
通过代码实线交换机与队列的绑定关系

如果消费者去消费一个不存在的队列将会出现异常

以 direct 模式为例

public class Producer {
    public static void main(String[] args) {
        // 连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.126.130");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 连接
            connection = connectionFactory.newConnection("生产者");
            // 信道
            channel = connection.createChannel();
            // 定义队列
            String queueName = "queue2";
            channel.queueDeclare(queueName,false,false,false,null);
            // 定义交换机
            String exchangeName = "direct_message_exchange";
            String exchangeType = "direct";
            channel.exchangeDeclare(exchangeName, exchangeType, true);//第三个参数是否持久化
            // 绑定交换机和队列
            channel.queueBind("queue2", exchangeName, "order");
            // 绑定上面的各个参数,发送消息
            String message = "Hello";
            channel.basicPublish(exchangeName, queueName, null, message.getBytes());
            System.out.println("消息发送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
7. RabbitMQ入门案例 - Work模式

简单模式不需要指定交换机,直接指定队列,一个队列对应一个消费者;

工作模式不需要指定交换机,直接指定队列,一个队列对应多个消费者;

当一个队列中有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

轮询模式的分发:一个消费者一条,按均分配
公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
Work模式轮询模式(Round-Robin)
不指定哪种模式的情况下,默认就是轮询模式,强调均分性;不管消费者处理速度如何,一律均分

生产者

跟简单模式一样!无需指定交换机(有默认),需要指定队列

消费者

创建两个一样的!且使用同一个队列。应答模式ack必须使用自动应答 true

Work模式公平分发模式
生产者

与上面的生产者一样

消费者

创建两个相同的消费者,使用同一个队列,必须使用手动应答,autoAck为false,和指定指标qos

这样消费时就会切换为公平分发模式,能者多劳,处理快的消费的更多

指定qos,qos是一次从队列中取出的消息数量,不易过大,根据情况定,通常写1也够了

//简单模式
public class Consumer{
	//3.接受内容
    //指标定义出来
    channel.basicQos(1);
    channel.basicConsume("queue1",false,new DefaultConsumer(){
        public void handle(String consumerTag, Delivery message) throws IOException {
          System.out.println(new String("收到消息是" + new String(meassage.getBody()),"UTF-8"));
          //改成手动应答
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        },new CancelCallback(){
            public void handle(String consumerTag) throws IOException {
                System.out.println("接受失败了");
        }
      });
    //4.关闭
    channel.close();
    connection.close();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
内存、磁盘空间设置


memory:rabbitmq设置的最大内存是物理内存的0.4倍,所以不要设置过高把内存撑爆,看自己的内存大小设定
Disk space:磁盘大小,当磁盘还剩50m时,就会发出预警,无法再接收消息,需要合理设置
8.RabbitMQ入门案例 - Headers模式
通过设置参数的方式来匹配队列,这个参数在代码中对应了channel.basicPublish()中的props参数

web管理端实现
创建交换机,指定headers类型

绑定队列,指定参数


交换机发送消息

查看队列queue1接收情况


9. RabbitMQ使用场景
面试技巧:

面试官询问为什么使用RabbitMQ,
分析:

直接消息队列的使用场景一看就是没经验,只是了解
如何回答:

我在xxx公司做xxx业务,一开始用单体架构,后来随着公司发展开始拆分为分布式,其中两个模块需要互相通信、协同,于是采用了消息队列,经过分析选择了RabbitMQ,我自己认为使用了之后有哪些优点,这时候开始把RabbitMQ的优点顺理成章搬出来。
解耦、削峰、异步
同步异步的问题(串行)

串行方式:

将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
缺点:

耗时长
一个环节出现异常,会导致整条链路回滚


public void makeOrder(){
    //1.发送订单
    //2.发送短信服务
    //3.发送email服务
    //4.发送app服务
}
1
2
3
4
5
6
并行方式 异步线程池

并行方式:

将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。
与串行的差别,并行的方式可以提高处理的时间
缺点:

需要自己开发、维护
解决持久化问题,消息需要存盘,需要对IO流很了解,
写盘的过程中还要考虑磁盘和内存资源的转换比
线程池耦合在代码中,也占用了JVM内存,不利于扩展,以后的高可用

public void test(){
    //异步
    theadpool.submit(new Callable<Object>{
        //1.发送短信服务
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //2.
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //3.
    })
    //异步
    theadpool.submit(new Callable<Object>{
        //4.
    })
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
存在问题

耦合度高
需要自己写线程池自己维护成本太高
出现了消息可能会丢失,需要你自己做消息补偿
如何保证消息的可靠性你自己写
如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式



好处:

完全解耦,用 MQ建立桥接
有独立的线程池和运行模型
出现了消息可能会丢失,MQ有持久化功能
如何保证消息的可靠性,死信队列和消息转移等
如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍

高内聚,低耦合



好处:

完全解耦,用 MQ建立桥接
有独立的线程池和运行模型
出现了消息可能会丢失,MQ有持久化功能
如何保证消息的可靠性,死信队列和消息转移等
如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍

四、Springboot 整合案例


项目准备
创建空项目,用springboot分别创建生产者和消费者的子模块module

确保rabbitmq服务已经启动,防火墙、阿里安全组已经调整完毕

1. Fanout 模式
生产者

核心配置:application.yml

# 服务端口
server:
  port: 8080

# 不配置的话就是默认本机ip 端口5672 账户guest/guest
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 41.104.141.27
    port: 5672
1
2
3
4
5
6
7
8
9
10
11
12
订单服务:OrderService.java

@Service
public class OrderService{
    // 获取连接对象
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        // 1.根据商品id查询库存是否足够
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
配置类:RabbitMqConfiguration

// 配置类配置具体的rabbitmq属性参数
@Configuration
public class RabbitMqConfiguration{
    // 1.声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange",true,false);
    }
    // 2.声明队列
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.fanout.queue",true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.fanout.queue",true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.fanout.queue",true);
    }
    // 3.完成绑定关系
    @Bean
    public Binding smsBingding(){
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding duanxinBingding(){
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding emailBingding(){
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
消息发送测试:springboot测试类

@SpringBootTest
class RabbitmqSpringbootOrderProducerApplicationTests {
	@Autowired
	private OrderService orderService;
	@Test
	void contextLoads() {
		orderService.makeOrder("1", "1", 100);
	}
}
1
2
3
4
5
6
7
8
9
运行测试,查看管理界面,发布于订阅模式下,三个队列都能收到消息




消费者

核心配置:application.yml

# 服务端口,与生产者区分开
server:
  port: 8081

# 不配置的话就是默认本机ip 端口5672 账户guest/guest
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 41.104.141.27
    port: 5672
1
2
3
4
5
6
7
8
9
10
11
12
创建消费者:FanoutSmsConsumer、FanoutDuanxinConsumer、FanoutEmailConsumer

@Component
@RabbitListener(queues = {"sms.fanout.queue"})// 对应队列名
public class FanoutSmsConsumer{
    @RabbitHandler
    // 该方法的参数就是接收的消息
    public void reviceMessage(String message){
        System.out.println("sms接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class FanoutDuanxinConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("duanxin接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
@Component
@RabbitListener(queues = {"email.fanout.queue"})
public class FanoutEmailConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
配置类:省略

由于生产者已经创建配置类配置了交换机、队列等信息,这里就不再需要配置类去定义了
但是如果是先启动消费者,而配置在生产者里面,消费者就会去监听不存的队列,就会出现异常
为了安全起见,可以生产者、消费者都配置一样的配置类,不论谁先启动rabbitmq都会进行配置
配置类定义在生产者里还是消费者里面那都是一样的,甚至也可以在web端定义
最好还是在消费者里定义,因为消费是先启动监听的,如果找不到队列容易出现启动异常
启动测试:

运行主启动类,消费者模块开启监听消息状态
yml配置中如果有中文注释可能会启动失败
如果端口占用就换一个
注意队列名要相匹配


2. Direct 模式
direct模式只需要在fanout模式基础上添加一些修改

生产者

在生产模块中添加direct配置类:DirectRabbitMqConfiguration

定义三个队列,一个交换机,队列绑定交换机,队列携带路由键

// 配置类配置具体的rabbitmq属性参数
@Configuration
public class DirectRabbitMqConfiguration {
    // 1.声明注册fanout模式的交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_order_exchange",true,false);
    }
    // 2.声明队列
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.direct.queue",true);
    }
    @Bean
    public Queue duanxinQueue(){
        return new Queue("duanxin.direct.queue",true);
    }
    @Bean
    public Queue emailQueue(){
        return new Queue("email.direct.queue",true);
    }
    // 3.完成绑定关系
    @Bean
    public Binding smsBingding(){
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }
    @Bean
    public Binding duanxinBingding(){
        return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with("duanxin");
    }
    @Bean
    public Binding emailBingding(){
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
订单服务:OrderService.java

指定使用哪个路由键发送消息,哪个路由键对应的队列就会接受消息

@Service
public class OrderService {
    // 获取连接对象
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 模拟用户下单
    public void makeOrder(String userid,String productid,int num) {
        // 1.根据商品id查询库存是否足够
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "direct_order_exchange";
        String routingKey = "";
        // 这了我们只让sms和email对应的队列接收消息
        rabbitTemplate.convertAndSend(exchangeName,"email", orderId);
        rabbitTemplate.convertAndSend(exchangeName,"sms", orderId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
消费者

创建消费者:DirectSmsConsumer、DirectDuanxinConsumer、DirectEmailConsumer

@Component
@RabbitListener(queues = {"duanxin.direct.queue"})
public class DirectDuanxinConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("duanxin接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
@Component
@RabbitListener(queues = {"email.direct.queue"})
public class DirectEmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
@Component
@RabbitListener(queues = {"sms.direct.queue"})// 对应队列名
public class DirectSmsConsumer {
    @RabbitHandler
    // 该方法的参数就是接收的消息
    public void reviceMessage(String message){
        System.out.println("sms接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
9
启动生产者测试类,发送消息,


启动主启动类:只有当前所监听的队列与交换机绑定,且队列有生产者发消息时携带的路由键,才能接收消息


由于生产者的service中,只对 sms、email这两个路由键发送消息,所以email和sms分别对应这两个路由键,接收到了消息

3. Topic 模式(采用注解配置)
SpringBoot除了可以使用配置类的方式定义rabbitmq相关信息,还可以使用注解的方式进行配置,这里我们用注解的方式进行配置

生产者

OrderService中添加方法,

	public void makeOrderTopic(String userid,String productid,int num) {
        // 1.根据商品id查询库存是否足够
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "topic_order_exchange";
        String routingKey = "com.duanxin";// 短信和sms可以收到
        rabbitTemplate.convertAndSend(exchangeName,routingKey, orderId);
    }
1
2
3
4
5
6
7
8
9
10
11
消费者(采用注解)

创建消费者:TopicSmsConsumer.java、TopicDuanxinConsumer.java、TopicEmailConsumer.java

在注解上完成队列定义、交换机绑定,队列携带的路由键

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "duanxin.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "#.duanxin.#"
))
public class TopicDuanxinConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("duanxin接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "*.email.#"
))
public class TopicEmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "com.#"
))
public class TopicSmsConsumer {
    @RabbitHandler
    // 该方法的参数就是接收的消息
    public void reviceMessage(String message){
        System.out.println("sms接收到了的订单信息是:"+message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
启动消费者主启动类,开始监听

运行生产者测试类 调用makeOrderTopic方法,发送消息

根据路由键的匹配机制,duanxin和sms接收到了消息


小结
rabbitmq整合springboot更加快速,高效
但是一定要对web端配合原生的rabbitmq配置要理解透彻
整合的过程中要注意各种参数的配置,登录信息、防火墙、安全组、交换机名、队列名、绑定关系、路由键、是否交给spring容器管理,端口是否冲突, 都容易出问题
绑定关系的配置在生产者和消费者都可以,最好还是在消费者绑定,因为消费者先启动,如果没有找到绑定容易出现异常
关于使用配置类还是注解配置,推荐使用配置类,功能最完善,注解比较鸡肋,匹配不详细,当然也可以在web端配置
需要注意的是,队列不可以重复创建,重复创建也不会覆盖原有,而是报错,所以我们在进行新的测试时,如果队列名重复,需要提前进入web端删除对应的队列,否则影响测试
五、RabbitMQ高级
1. 过期时间TTL
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置

第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
第二种方法是对消息进行单独设置,每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息

如果给队列设置为过期队列,过期队列的消息会写入到死信队列,message则不会

超时的参数设置,以及死信的参数设置,可以查看web端界面


设置队列TTL
设置队列TTL

创建配置类TTLRabbitMQConfiguration.java

在消费中添加配置类,定义队列、交换机、绑定关系

@Configuration
public class TTLRabbitMQConfiguration{
    // 1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);}
    // 2.队列的过期时间
    @Bean
    public Queue directttlQueue(){
        // 设置过期时间
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);// 这里一定是int类型
        return new Queue("ttl.direct.queue",true,false,false,args);}

    @Bean
    public Binding ttlBingding(){
        return BindingBuilder.bind(directttlQueue()).to(ttldirectExchange()).with("ttl");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
生产者添加业务方法,生产消息,在OrderService中添加

	public void makeOrderTtl(String userid,String productid,int num) {
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功:"+orderId);
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttl";
        rabbitTemplate.convertAndSend(exchangeName,routingKey, orderId);
    }
1
2
3
4
5
6
7
生成测试类,调用makeOrderTtl方法,生产数据

	@Test
	void contextLoads2() {
		orderService.makeOrderTtl("1", "1", 10);
	}
1
2
3
4
查看web界面,队列多出一条消息,过了5秒,自动消失

设置消息TTL
设置消息TTL

修改 OrderService 类,

发送的消息从原来的 String 变为 MessagePostProcessor 对象

public class OrderService{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //模拟用户下单
    public void makeOrder(String userid,String productid,int num){
        //1.根据商品id查询库存是否足够
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        sout("订单生产成功:"+orderId);
        //3.通过MQ来完成消息的分发
        //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
        String exchangeName = "ttl_order_exchange";
        String routingKey = "ttlmessage";
        //给消息设置过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                //这里就是字符串
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        }
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
RabbitMqConfiguration.java

@Configuration
public class TTLRabbitMQConfiguration{
    //1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);}
    //2.队列的过期时间
    @Bean
    public Queue directttlQueue(){
        //设置过期时间
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//这里一定是int类型
        return new Queue("ttl.direct.queue",true,false,false,args);}
    @Bean
    public Queue directttlMessageQueue(){
        return new Queue("ttlMessage.direct.queue",true,false,false,args);}
    
    @Bean
    public Binding ttlBingding(){
        return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2. 死信队列
概述
DLX,全称 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:

消息被拒绝
消息过期
队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可

定义死信交换机:DeadRabbitMqConfiguration.java

@Configuration
public class DeadRabbitMqConfiguration{
    // 1.声明注册direct模式的交换机
    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("dead_direct_exchange",true,false);}
    // 2.队列的过期时间
    @Bean
    public Queue deadQueue(){
        return new Queue("dead.direct.queue",true);}
    @Bean
    public Binding deadbinds(){
        return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
修改正常的交换机配置:TTLRabbitMQConfiguration

正常的ttl交换处理消息,如果发生超时,则交给死信交换机,进入死信队列,进而下一步操作

如果是direct模式,那么死信队列也要有key,如果是fanout模式,则死信队列不需要key

@Configuration
public class TTLRabbitMQConfiguration{
    // 1.声明注册direct模式的交换机
    @Bean
    public DirectExchange ttldirectExchange(){
        return new DirectExchange("ttl_direct_exchange",true,false);}
    // 2.队列的过期时间
    @Bean
    public Queue directttlQueue(){
        //设置过期时间
        Map<String,Object> args = new HashMap<>();
        //args.put("x-max-length",5);
        args.put("x-message-ttl",5000);// 超时设置,这里一定是int类型
        args.put("x-dead-letter-exchange","dead_direct_exchange");// 绑定死信交换机
        args.put("x-dead-letter-routing-key","dead");// 路由key,fanout不需要配置
        return new Queue("ttl.direct.queue",true,false,false,args);}
    @Bean
    public Queue directttlMessageQueue(){
        return new Queue("ttlMessage.direct.queue",true,false,false,args);}
    
    @Bean
    public Binding ttlBingding(){
        return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
配置了死信队列,消息超时之后自动进入死信队列,这是消息的一种可靠机制,直接移除是很危险的,



3. 内存磁盘的监控
RabbitMQ内存警告
当内存使用超过配置的阈值或者磁盘空间剩余空间大于剩余的阈值时,RabbitMQ会暂时阻塞客户端连接,并且停止接收从客户端发来的消息,一次避免服务器的崩溃,客户端与服务端的心跳检测机制也会失效



memory 为105MB,表示当前rabbitmq服务使用内存为105mb,下面389MB high waterm 表示如果rabbitmq使用用内存达到这个阈值,就会触发警告,随后 connections 菜单中的所有的链接就会变成阻塞,即 block,生产者无法再将消息存储到消息队列中;

接下来必须尽快将内存上线调高,或者增加内存,也可能是程序中出现了死循环,

消息从内存转移到磁盘的过程,叫做消息的持久化,涉及到持久队列,

rabbitmq的内存上线阈值默认是物理内存的0.4倍,本人的虚拟机内存为1GB,所以这里的阈值389MB就是这么来的

出现内存或磁盘不足的时候应该尽快调整,避免影响生产环境的使用

RabbitMQ的内存控制
当出现警告的时候,可以通过配置去修改和调整

方式一:通过命令

rabbitmqctl set_vm_memory_high_watermark <fraction> #表示相对值
rabbitmqctl set_vm_memory_high_watermark absolute 50MB #对应绝对值
1
2
fraction/value 为内存阈值,fraction为相对值,absolute后面写绝对值。默认情况是:0.4或2GB,代表的含义是:当 RabbitMQ使用的内存超过40%或value时,就会产生警告并且会阻塞所有生产者的连接。两种命令只需执行一个即可。

比如,这里将阈值设置为50MB,当前使用了104MB已经超过阈值了,爆红


方式二:修改配置文件 rabbitmq.conf

修改配置文件:/etc/rabbitmq/rabbitmq.conf

#默认
#vm_memory_high_watermark.relative = 0.4
#使用ralative相对值进行设置fraction,建议取值在0.4-0.7之间,不建议超过0.7
vm_memory_high_watermark.relative = 0.6
#使用absolute的绝对值方式,单位KB,MB,GB
vm_memory_high_watermark.absolute = 2GB
1
2
3
4
5
6
注意:

通过命令修改的阈值将在Broker重启后失效,通过修改配置文件设置的阈值不会随着重启而消失,但修改配置文件后要重启 Broker才会生效
相对值建议设置到0.4到0.7之间,全给了rabbitmq会造成其他应用无法运行,绝对值的话可以自己换算一下比例对应,二者设置一个即可
生产中遇到内存爆红的情况一定要尽快解决,加内存,或命令进行修改
RabbitMQ的内存换页
在某个Broker节点及内存阻塞生产者之前,他会尝试将队列中的消息换页到磁盘中以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中的一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。

默认情况下,内存达到阈值的50%就会换页处理
也就是说在默认情况下,比如,当内存阈值为0.4时,那么当内存使用超过0.4*0.5=0.2时,就会进行换页操作
命令修改:

vm_memory_high_watermark_paging_ratio = 0.7 # 阈值的0.7时换页
1
这个值要小于1,如果设置为大于等于1,那么内存使用都已经积攒到阈值了,就已经阻塞了,再换页就没有意义了,通常为0.7

RabbitMQ的磁盘控制


当磁盘的剩余空间低于确定的阈值时,rabbitmq同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。

比如上图disk space表示,当前的磁盘阈值是93GB,而磁盘剩余空间为35GB,35<93所以就会爆红

默认情况下,磁盘预警为50MB的时候预警,表示当剩余磁盘空间低于50MB的时候会阻塞生产者并停止内存消息换页到磁盘的过程
这个阈值可以减小,但是不能完全消除因磁盘而导致崩溃的可能性,比如在两次磁盘空间的检查空隙内,第一次检查是60MB,第二次检查可能就是1MB二出现警告
当出现磁盘空间预警要尽快加硬盘,避免消息阻塞影响生产
命令修改方式:

rabbitmqctl set_disk_free_limit <disk_limit># 绝对值
rabbitmqctl set_disk_free_limit memory_limit <fraction># 相对值
1
2
4.集群
RabbitMQ集群
RabbitMQ这款消息队列中间件本身基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering,这使得RabbitMQ本身不需要向ActiveMQ、Kafka那样通过Zookeeper分别实现高可用方案和保存集群的元数据,集群是保证可靠的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的

在实际使用中采取多机多实例部署方式,为了便于我们学习搭建,这里我们主要针对单机多实例来演示

配置集群的前提是 rabbitmq 可以运行起来,

使用命令ps aix|grep rebbitmq查看相关进程,命令rabbitmqct status你可以查看rabbitmq状态信息而不报错:

单机多实例搭建
通常集群的搭建至少需要三个节点,这里我们搭建两个,rabbit-1和rabbit-2,1作为主节点maste,演示够用了

关闭原有的rabbitmq

先将本机上的单机rabbitmq关闭,

systemctl stop rabbitmq-server
1
如果是docker启动的话,则通过docker的方式来关闭

docker stop 容器号
systemctl stop docker
1
2
这里我们使用普通安装的rabbitmq演示,没有使用docker启动

创建新的节点

创建并且启动rabbit-1

sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
1


创建并启动rabbit-2

因为部署在一台机器上,所以端口号要分开,web端15673也是为了区分开

sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
1

验证启动

ps aux|grep rabbitmq
1
绑定节点关系

重新启动,绑定主从关系,1为主节点,2为从节点

#停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
#清除节点上的历史数据,如果不清除,无法将节点加入到集群,因为集群的数据是共享的
sudo rabbitmqctl -n rabbit-1 reset
#启动应用
sudo rabbitmqctl -n rabbit-1 start_app
1
2
3
4
5
6
#停止应用
sudo rabbitmqctl -n rabbit-2 stop_app
#清除节点上的历史数据,如果不清除,无法将节点加入到集群,因为集群的数据是共享的
sudo rabbitmqctl -n rabbit-2 reset
#将rabbit-2节点加入到rabbit-1主节点集群中,Server-node是服务器的主机名,就是左侧root@后面那个,比如我的就是localhost,
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@Server-node
#启动应用
sudo rabbitmqctl -n rabbit-2 start_app
1
2
3
4
5
6
7
8

整个集群构建完毕,这种集群就是第一章提到的 集群模式1 - Master-slave主从共享数据的部署方式

验证集群状态

sudo rabbitmqctl cluster_status -n rabbit-1
1


Web端监控

默认web端监控是关闭的,我们将它打开

rabbitmq-plugins enable rabbitmq_management
1
提前设置防火墙和阿里安全组开放相关端口15672/15673/5672/5673。。。

设置并授权用户

比单机启动多了-n

rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
1
2
3
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
1
2
3
访问web端,15672和15673两个端口,


此时,我们在web端主节点和从节点的操作都会共享同步,包括创建队列、交换机、绑定、发消息等等

如果关闭从节点rabbit-2,从节点web端则无法访问,主节点有提示

关闭主节点也是如此,关闭节点,已经创建的队列、交换机也依然存在

需要注意的是,只有主节点先启动,从节点才可以启动
一主带多从还不够高可用,实际中,我们可以构建多组这样的小集群,构成一个更大的集群,再让各组之间通讯,这就完成了高可用解决方案
一主多从,分别在不同的机器上时

当然如果没有那么高的需求,一主多从也就够用了,我们也可以将一主多从这几个节点放在多台机器上,增加高可用性,但是必须要将主节点的erlang.cookie文件复制到其他几个从节点上,cookie位置:/安装目录/lib/rabbitmq/.erlang.cookie
逐个启动个节点,启动流程与上面的单机启动一样,不同在于@Server-node需要改成节点对应的ip
也可以配置各节点host文件(vim /etc/hosts)也就是虚拟映射,给各个节点定别名
ip1:rabbit-1
ip2:rabbit-2
5.分布式事务
概述:

分布式事务指事务的操作位于不同的节点上,需要保证事务的AICD特性
例如,在下单场景中,库存和订单如果在不同的节点上,就涉及分布式事务
两个独立的服务现在要组成一个整体,需要解决数据的一致性问题,比如下单后,库存异常导致下单失败,那么下单的数据也有好回滚

这是跨JVM的事务

Spring提供的事务支持智能控制当前JVM内级别的控制,无法控制其他JVM的事务

分布式事务的方式
一、两阶段提交(2PC)需要数据库生产商的支持,Java组件有atomikos支持等

两阶段提交,通过引入协调者Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行,

这是早期出现的解决方案

1.准备阶段

协调者询问参与者事务是否执行成功,参与者发挥事务指定的结果


2.提交阶段

如果事务在每个参与者上执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务

需要注意的是,在准备阶段,参与者执行了事务,但是还未提交,只有在提交阶段接收到协调者发来的通知后,才进行事务的提交或回滚


存在的问题:

同步阻塞,所有的事务参与者在等待其他参与者响应的时候都处于同步阻塞状态,无法进行其他操作
单点问题,协调者在2PC中起到非常大的作用,发生故障将会造成很大的影响,特别是在第二阶段发生故障,所有参与者会一直等待状态,无法完成其他操作
数据不一致,在阶段二,如果协调者只发送了部分commit消息,此时网站发生异常,那么只有部分参与者接收到commit消息,即部分参与者提交了事务,导致系统数据不一致
过于保守,任意一个节点失败就会导致整个事务失败,没有完整的容错机制
二、补偿事务(TCC)严选、阿里、蚂蚁金服

TCC其实是采用补偿机制,核心思想为,针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作,分三个阶段

Try阶段主要是对业务系统做出检测及资源预留
confirm阶段主要是对业务系统做确认提交,
Try阶段执行成功并开始执行Confirm阶段时,默认—confirm阶段是不会出错的,即只要try成功,confirm就一定成功
Cancel阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放
举个例子,Bob向Smith转账

在Try阶段,首先调用远程接口将Bob与Smith的钱给冻结起来
在Confirm阶段,执行远程调用的转账操作,转账成功并解冻账户
如果第二阶段执行成功,则转账成功;如果执行失败,则调用远程冻结接口对应的解冻方法(Cancel)
优点:

与2PC相比,实现流程简单一些,但数据一致性也比2PC差一些
缺点:

缺点明显,第二、三阶段都有可能执行失败,TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候写更多的补偿代码;
在一些场景中,一些业务流程可能用TCC不太好定义和处理
三、本地消息表(异步确保),支付宝、微信支付主动查询支付状态,对账的形式

本地消息表与业务数据表处于同一个数据库中,这样能利用本地事务来保证在这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。

一方完成操作都要通过消息队列和其他方对账,出现问题就广播通知所有方

在分布式操作的其中一方完成写业务数据的操作之后,向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入到本地消息表中
之后本地消息表中的消息转发Kafka等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发
在分布式事务操作的另一方从消息队列中读取消息,执行消息中的操作


优点:

一种非常经典的实现,避免了分布式事务,实现了最终一致性
缺点:

消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理
四、MQ事务消息,异步场景,通用较强,拓展性较高(本课程介绍的方法)

有些第三方MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式类似于采用第二阶段的提交,但是市面上的一些主流MQ都不支持事务消息,比如Kafka

第一阶段,prepared消息,会拿到消息的地址;第二阶段,执行本地事务;第三阶段,通过第一级阶段拿到的地址去访问消息,并修改状态
也就是说在业务方法内要向消息队列提交两次请求,一次发送消息,一次确认消息;如果确认消息失败了,RabbitMQ会定期扫描消息集群中的事务消息,这时发现了prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息,这样就保证了消息发送与本地事务同时成功,同时失效


优点:

实现了最终一致性,不需要依赖本地数据库事务
缺点:

实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也没有开源
小结:

对比几种分布式事务解决方案,分布式事务本身是一个技术性难题,没有一种方案可以完美应对各种场景,具体还是根据实际业务去选择,
阿里实现的RocketMQ实现的分布式事务,现在也出了很多分布式事务的协调器,比如LCN等
事务问题案例
整体架构


数据不一致案例演示

项目准备,订单业务和派单业务分别准备数据库和实体,并且程序可以启动成功

创建订单,本地数据库保存提交信息,

本地提交后,还要调用运单系统,将订单号传过去

如果,调用运单失败,createOrder就会抛出异常,触发注解事务,@Transactional,回滚事务


现在的这个事务是订单系统的事务,如果调用运单出现问题,订单可以回滚;

但是如果运单系统内部的运行就不受订单的事务控制了,这样就可能导致数据不一致;

如何解决上述问题,需要分布式事务来处理

基于MQ的分布式事务
整体设计思路



可靠生产问题

定时重发

开始改造代码,解决可靠生产问题,

增加消息冗余,

注意:将原来的@Transtraction去掉,因为现在我们开始用分布式事务


生产者接收RabbitMQ的回执,确定是否执行成功

@PostConstruct修饰的regCallback方法会在 RabbitTemplate构造函数之后执行,当然也是发生在sendMessage调用队列之前完成的

这样调用队列的时候,成功还是失败都会通过这个回执可以接收到

还需要配置yml,开启确认机制

publisher-confirm-type: correlated
1


测试运行之后,就会收到回执并保存消息表,可靠生产实现完毕

可靠消费

可靠消息代码实现

当消费者消费消息出现异常时,消息队列会不停的重发消息,触发死循环,冲垮服务器,消耗完内存,导致宕机

几种解决方案:

控制消息队列重发的次数
try catch+手动ack
try catch+手动ack+死信队列处理+人工干预(相对比较完美)
第一种方式:控制重试次数,

配置重试,修改yml

第三种方式,try catch+手动ack+死信队列+人工干预

配置yml,开启手动ack(默认none是自动ack)


这种方式需要关闭失败后的重发(requeue设置为false),然后出现问题的消息就会丢进死信队列,

如果开启重发,一旦消费出现异常还是会死循环;我们已经设置了重试次数,为什么还会死循环?因为try catch会将重试次数的机制给屏蔽掉;

所以添加了try catch以后,就不要开启重试,正常解决异常即可


抓住异常之后如何解决?用死信队列(DLX)来解创建一个新的消费者监听处置死信队列,处置以后也要及时移除,避免死信队列的堆积

当然,因为数据有重发机制,保存过程中还需要考虑幂等性问题,避免数据的重复,可以在数据库设置主键的方式解决,或者分布式锁解决



可靠生产、可靠消费 总结
单纯的rabbitmq无法解决分布式事务,而是加上了rabbitmq提供的其他机制,比如消息确认机制
可靠生产就是发送消息给队列,接收队列的回执,再后续处置,通过消息冗余、定时器,保证消息正常进入消息队列
可靠消费,在可靠消息正常进入队列的前提之后,队列将消息发给消费者,如果消费失败,通过一定的措施,保证消息进入死信队列,再进行后续处理
总之,就是保证,生产者的消息一定能投递给队列,队列发给消费者的消息,一定能得到处置。这样完成了消息的闭环
参考资料:

RabbitMQ中文文档:http://rabbitmq.mr-ping.com/
教学视频:https://www.bilibili.com/video/BV1dX4y1V73G?p=1
————————————————
版权声明:本文为CSDN博主「Super_Song_」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_47257749/article/details/116400508