MQ介绍

MQ:MessageQueue,音讯行列。 行列,是一种FIFO 先进先出的数据结构。音讯由出产者发送到MQ进行排队,然后按本来的顺序交由音讯的顾客进行处理

MQ的效果主要有以下三个方面:

  • 异步:进步体系响应速度和吞吐量;(快递员放快递到菜鸟驿站,客户有时间再取)
  • 解耦:减小服务间影响(进步稳定性),完成数据分发(发布/订阅形式,一个出产者对应多个顾客)
  • 削峰:稳定的体系资源应对突发的流量冲击(避免突发流量形成服务宕机)

MQ优缺点

  • 体系可用性下降(外部依靠增多,一旦MQ宕机,会对事务产生影响)
  • 体系复杂性进步(同步变异步,引进音讯消费问题,音讯不丢掉,顺序消费,重复消费等)
  • 音讯一致性问题(多个顾客怎么确保同时成功/失利)

常用的MQ产品包含Kafka、RabbitMQ和RocketMQ

RabbitMQ基础概念和编程模型

RabbitMQ安装

Releases rabbitmq/erlang-rpm (github.com)下载地址

RabbitMQ基础概念和编程模型

rpm -ivh rabbitmq-server-3.9.15-1.el7.noarch.rpm
# 发动服务,后台进程
service rabbitmq-server start 
# 发动程序,前台运行,检查发动过程
rabbitmqctl start_app 
# 状况
service rabbitmq-server status
# 封闭服务
rabbitmqctl stop_app

RabbitMQ基础概念和编程模型

安装web控制台插件,重启后生效

rabbitmq-plugins enable rabbitmq_management
http://192.168.119.133:15672
创立管理员用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator

集群搭建

一般形式

在这种集群形式下,集群各节点之间只会有相同的元数据,而音讯不会冗余,只存在一个节点,顾客在消费时,请求到了没有该音讯的节点,RabbitMQ会临时在节点间进行数据传输。

这种形式下音讯牢靠性不高。也不支撑高可用,某一个节点挂了之后,需求重启服务后,才能让这个节点的音讯正常消费。

镜像形式

RabbitMQ的官方HA高可用方案。需求在搭建了一般集群之后再弥补搭建。其本质区别在于,这种形式会在镜像节点中心主动进行音讯同步,而不是在客户端拉取音讯时临时同步。

而且在集群内部有一个算法会选举产生masterslave,当一个master挂了后,也会主动选出一个来。从而给整个集群供给高可用才能。

这种形式的音讯牢靠性更高,由于每个节点上都存着全量的音讯。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯很多的耗费,进而下降整个集群的功用。这种形式下,行列数量最好不要过多。

1、同步集群节点的cookie,途径/var/lib/rabbitmq/.erlang.cookie

2、将worker1和worker3参加worker2的集群中rabbitmqctl join_cluster --ram rabbit@worker2 参加时首先要发动worker1和worker3上的服务,不然呈现如下错误

RabbitMQ基础概念和编程模型

NODENAME=rabbit@worker3
依次履行
service rabbitmq-server start
rabbitmqctl start_app
rabbitmqctl join_cluster --ram rabbit@worker2

参加集群后如下所示,rabbitmqctl cluster_status检查集群状况

RabbitMQ基础概念和编程模型

--ram,表明节点的元数据(交换机、行列定义信息)只保存在内存中;此时存在单点故障,如果worker2节点宕机,元数据有可能丢掉。所以官方不建议一切节点都运用ram。


通常在出产环境中,为了削减RabbitMQ集群之间的数据传输,在装备镜像战略时,会针对固定的虚拟主机virtual host来装备。

创立一个虚拟主机,并增加对应的镜像战略

rabbitmqctl add_vhost /mirror
rabbitmqctl set_policy ha-all --vhost "/mirror" "^" '{"ha-mode":"all"}'

通常镜像形式的集群已经足够满足大部分的出产场景了。尽管他对体系资源耗费比较高,可是在出产环境中,体系的资源都是会做预留的,所以正常的运用是没有问题的。可是在做事务集成时,仍是需求注意行列数量不宜过多,而且尽量不要让RabbitMQ产生很多的音讯堆积。

创立行列

RabbitMQ基础概念和编程模型

根底概念

RabbitMQ是依据AMQP协议(音讯行列协议,用于出产者和顾客之间通信)开发的一个MQ产品

RabbitMQ基础概念和编程模型

虚拟主机Virtual host

在一个RabbitMQ Server或集群中能够划分出多个虚拟主机,每一个虚拟主机都有AMQP的全套组件,而且能够针对每个虚拟主机进行权限和数据分配;虚拟主机之间是完全隔离的

衔接Connection

客户端与RabbitMQ进行交互,首先就需求树立一个TPC衔接,这个衔接便是 Connection。

信道Channel

一旦客户端与与RabbitMQ树立了衔接,就会分配一个AMQP信道 Channel。每个信道有仅有ID,数据操作根本在信道Channel中展开。RabbitMQ为了削减功用开销,会在一个Connection中树立多个Channel,这样便于客户端进行多线程衔接,这些衔接会复用同一个Connection的TCP通道。

交换机Exchange

音讯发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机将数据转发到不同的行列中。RabbitMQ中有多种不同类型的交换机来支撑不同的路由战略。

  • Direct Exchange:依据音讯的Routing key,将音讯路由到匹配的行列
  • Topic Exchange: 依据音讯的Routing key和通配符(*匹配一个单词,#匹配多个单词),将音讯路由到匹配的行列
  • Headers Exchange: 依据音讯的头部信息路由音讯到匹配行列,头部信息能够是恣意键值对
  • Fanout Exchange: 将音讯广播到与之绑定的一切行列

行列Queue

行列是实践保存数据的最小单位,具备FIFO的特性,音讯会被发到行列中,然后才被顾客消费

Classic经典行列

在单机环境中,拥有比较高的音讯牢靠性

Quorum仲裁行列

Quorum是依据Raft一致性协议完成的一种新式的分布式音讯行列,他完成了耐久化,多备份的FIFO行列,主要便是针对RabbitMQ的镜像形式规划的。简单理解便是quorum行列中的音讯需求有集群中半数节点赞同承认后,才会写入到行列中。这种行列类似于RocketMQ傍边的DLedger集群。这种方法能够确保音讯在集群内部不会丢掉。同时,Quorum是以牺牲很多高档行列特性为价值,来进一步确保音讯在分布式环境下的高牢靠。

Feature Classic Mirrored Quorum 注释
Non-durable queues yes no不支撑 不耐久化行列
Exclusivity yes no 只能由声明行列的connection运用
Per message persistence per message always
Membership changes automatic manual
Message TTL (Time-To-Live) yes yes (since 3.10)
Queue TTL yes partially (lease is not renewed on queue re-declaration)
Queue length limits yes yes (exceptx-overflow:reject-publish-dlx)
Lazy behaviour yes always (since 3.10)
Message priority yes no
Consumer priority yes yes
Dead letter exchanges yes yes
Adheres topolicies yes yes (seePolicy support)
Poison message handling no yes 毒音讯
GlobalQoS Prefetch yes no

Poison message handling:毒音讯,音讯一向不能被顾客正常消费(消费失利或许消费逻辑有问题),就会导致音讯不断的从头入队,形成功用浪费;Quorum行列会跟踪音讯的失利次数,记录在x-delivery-count头部参数中,然后通过设置Delivery limit设置阈值,失利次数超过阈值就会删去音讯,或许装备了死信行列,就进入对应的死信行列

声明Quorum行列
Map<String,Object> params =newHashMap<>();  
params.put("x-queue-type","quorum");  
//声明Quorum行列的方法便是增加一个x-queue-type参数,指定为quorum。默认是classic  
channel.queueDeclare(QUEUE_NAME,true,false,false, params);

Quorum行列更适合于行列长期存在,而且对容错、数据安全方面的要求比低推迟、不耐久等高档行列更能要求更严厉的场景。

例如 电商体系的订单,引进MQ后,处理速度能够慢一点,可是订单不能丢掉。

不适用的场景

  • 临时运用的行列,或许常常修改和删去的行列
  • 对音讯推迟要求高,Raft一致性算法会影响音讯的推迟
  • 对数据安全性要求不高,Quorum行列需求顾客手动通知或许出产者手动承认
  • 行列音讯积压严重或许音讯很大,Quorum行列会将一切音讯一直保存在内存中,直到撑爆内存

Stream行列

Stream行列是RabbitMQ自3.9.0版本开始引进的一种新的数据行列类型,也是目前官方最为引荐的行列类型。这种行列类型的音讯是耐久化到磁盘而且具备分布式备份的,更适合于顾客多,读音讯十分频频的场景。

Stream行列的核心是以append-only的方法将音讯耐久化到日志文件中,然后通过调整顾客的消费进展offset,完成音讯的屡次分发。以下是4个主要特点

  • 大规模分发,已有的音讯行列是一个顾客绑定一个专用行列,Stream行列答应恣意数量的顾客运用同一个行列
  • 音讯回溯,已有的行列中,音讯被顾客消费完后,会从行列中删去,无法读取消费过的音讯,Stream答应顾客从日志的任何节点开始从头读取音讯
  • 高吞吐量
  • 大日志,已有的行列中积累的音讯过多时,功用下降会十分明显,Stream行列的规划方针以最小的内存开销存储很多数据

功用比照

Feature Classic Stream
Non-durable queues yes no
Exclusivity yes no
Per message persistence per message always
Membership changes automatic manual
TTL yes no (but seeRetention)
Queue length limits yes no (but seeRetention)
Lazy behaviour yes inherent
Message priority yes no
Consumer priority yes no
Dead letter exchanges yes no
Adheres topolicies yes (seeRetention)
Reacts tomemory alarms yes no (uses minimal RAM)
Poison message handling no no
GlobalQoS Prefetch yes no
Map<String,Object> params =newHashMap<>();
params.put("x-queue-type","stream");  
params.put("x-max-length-bytes",20_000_000_000L);// 日志文件的最大字节数: 20 GB  
params.put("x-stream-max-segment-size-bytes",100_000_000);// 每一个日志文件的最大巨细: 100 MB  
channel.queueDeclare(QUEUE_NAME,true,false,false, params);

RabbitMQ编程模型

根底模型

成功发送音讯

RabbitMQ基础概念和编程模型

for (int i = 100; i < 200; i++) {
   String newMessage = String.format("亚索%d级了", i);
   channel.basicPublish("", QUEUE_NAME, null, newMessage.getBytes("UTF-8"));
   System.out.println(" [x] Sent '" + newMessage + "'");
}

消费2个音讯

RabbitMQ基础概念和编程模型

pull,主动从服务器上获取音讯
GetResponse response = channel.basicGet(QUEUE_NAME, true);
push,服务端推送音讯过来,履行回调函数
channel.basicConsume(QUEUE_NAME, true, myconsumer);

官网模型

1、Hello World

RabbitMQ基础概念和编程模型

Producer端发送一个音讯到指定的queue,中心不需求任何exchange规则,Consumer端按queue消费

Producer
channel.queueDeclare(QUEUE_NAME,false,false,false,null);  
channel.basicPublish("", QUEUE_NAME,null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
GetResponse response = channel.basicGet(QUEUE_NAME, true);

2、Work Queues

RabbitMQ基础概念和编程模型

Producer:发音讯到方针行列
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//耐久化音讯
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,  message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

3、Publish/Subscribe

发布/订阅机制

RabbitMQ基础概念和编程模型

Producer只担任发送音讯到Exchange,再由Exchange(type=fanout)分配到与该Exchange绑定的Queue,顾客创立行列绑定到Exchange上

Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer: 绑定到Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

4、Routing

Producer只担任发送音讯到Exchange,Exchange(type=direct)依据routingkey将不同类别的音讯分发到不同的Queue

RabbitMQ基础概念和编程模型

Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, ro utingKey, null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
channel.basicConsume(queueName, true, consumer);

5、Topics

Producer只担任发送音讯到Exchange,Exchange(type=direct)依据routingkey将不同类别的音讯分发到不同的Queue,routingkey支撑含糊匹配,单词间用.离隔,*代表一个单词,#代表0个或多个单词

RabbitMQ基础概念和编程模型

Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "hero.yasuo.lol", null, message.getBytes("UTF-8"));
------------------------------------------------------------------------------
Consumer:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.lol");
channel.queueBind(queueName, EXCHANGE_NAME, "#.lol");
channel.basicConsume(queueName, true, consumer);

6、RPC远程调用

异步降级为同步调用,不引荐运用

7、Publish Confirms

确保出产者发送成功,发送者发送音讯的根底APIProducer.basicPublish方法是没有返回值的,也便是说,一次发送音讯是否成功,出产者是不知道的,这在事务上就容易形成音讯丢掉。而这个模块便是通过给发送者供给一些承认机制,来确保这个音讯发送的过程是成功的。