rabbitmq(RabbitMQ详解)

时间:2023-10-09 12:53:34 阅读:6

RabbitMQ详解

RabbitMQ的优点:

  • 开源, 功能好效, 安定性好
  • 提供可靠性消息投递形式(confirm), 前往形式(return)等
  • 与Spring完善整合, API丰厚
  • 集群形式丰厚, 支持表达式设置, 高可用HA形式, 镜像行列模子
  • 可以确保数据不丧失的条件下做到高可靠性, 可用性

RabbitMQ高功能缘故:

  • 由Erlang言语开发,承继其天生的并发性,安定性和宁静性有保证

RabbitMQ的协议:

AMQP(Advanced Message Queuing Protocol)高等消息行列协议,是一个异步消息转达所使用使用层协议标准,为面向消息正中件计划,基于此协议的客户端与消息正中件可以无视消息泉源转达消息,不受客户端、消息正中件、不同的开发言语情况等条件的限定。

计划看法表明:

  • Server : 又称Broker, 承受客户端毗连, 完成AMQP实体办事
  • Connection : 毗连, 使用步骤与Broker的网络毗连
  • Channel : 网络信道, 几乎一切的利用都在Channel中举行, Channel是举行消息读写的通道。客户端可以创建多个Channel, 每个Channel代表一个会话职责。
  • Message : 消息, 办事器和使用步骤之间传送的数据, 有Properties和Body构成。Properties可以抵消息举行修饰, 好比消息的优先级, 延长等高等特性; Body就是消息体内容。
  • Virtual Host : 假造地点, 用于举行逻辑断绝, 最表层的消息路由。一个Virtual Host内里可以有多少个Exchange和Queue, 同一个Virtual Host内里不克不及有相反称呼的Exchange或Queue
  • Exchange : 互换机, 用于吸收消息, 依据路由键转发消息到绑定的行列
  • Binding : Exchange和Queue之间的假造毗连, binding中可以包含routing key
  • Routing Key : 一个路由端正, 假造机可用它来确定怎样路由一个特定消息
  • Queue : 也成Message Queue, 消息行列, 用于保存消息并将它们转发给消耗者

RabbitMQ全体架构

RabbitMQ成员简介

Binding-绑定

  • Exchange和Exchange, Queue之间的毗连干系
  • 绑定中可以包含RoutingKey大概参数

Queue-消息行列

  • 消息行列, 实践存储消息数据
  • Durability : 对否历久化
  • Auto delete : 如选yes,代表当最初一个监听被移除之后, 该Queue会主动被删除

Message-消息

  • 办事和使用步骤之间传送的数据
  • 实质上就是一段数据, 由Properties和Payload(Body)构成
  • 常用属性 : delivery mode, headers(自界说属性)
  • 其他属性content_type, content_encoding, prioritycorrelation_id : 可以以为是消息的唯一idreplay_to : 重回行列设定expiration : 消息过时时间message_id : 消息idtimestamp, type, user_id, app_id, cluster_id

Virtual Host-假造主机

  • 假造地点, 用于举行逻辑断绝, 最表层的消息路由
  • 一个Virtual Host内里可以有多少个Exchange和Queue
  • 同一个Virtual Host内里不克不及有相反称呼的Exchange或Queue

Exchange互换机

吸收消息,并依据路由键转发消息到所绑定的行列

注:互换机不会存储消息,假如消息发送到没有绑定消耗行列的互换机,消息则丧失。

互换机的属性

  • Name : 互换机称呼
  • Type : 互换机典范, direct, topic, fanout, headers
  • Durability : 对否必要历久化, true为历久化
  • Auto Delete : 当最初一个绑定到Exchange上的行列删除后, 主动删除该Exchange
  • Internal : 如今Exchange对否用于RabbitMQ内里使用, 默以为False, 这个属性很少会用到
  • Arguments : 扩展参数, 用于扩展AMQP协议订定化使用

互换机的四品种型

  • Direct exchange(直连互换机)是依据消息携带的路由键(routing key)将消息投递给对应行列的注意 : Direct形式可以使用RabbitMQ自带的Exchange(default Exchange), 以是不必要将Exchange举行任何绑定(binding)利用, 消息转达时, RoutingKey必需完全婚配才会被行列吸收, 不然该消息会被丢弃

  • Fanout exchange(扇型互换机)将消息路由给绑定到它身上的一切行列不处理路由键, 只必要简便的将行列绑定到互换机上发送到互换机的消息都市被转发到与该互换机绑定的一切行列上Fanout互换机转发消息是最快的

  • Topic exchange(主题互换机)行列经过路由键绑定到互换机上,然后,互换机依据消息里的路由值,将消息路由给一个或多个绑定行列(含糊婚配)“#” : 婚配一个或多个词“*” : 婚配一个词

  • Headers exchange(头互换机)相似主题互换机,但是头互换机使用多个消息属性来代替路由键创建路由端正。经过推断消息头的值可否与指定的绑定相婚配来建立路由端正。

RabbitMQ常用的5种事情形式

1、点对点(简便)的行列

  • 不必要互换机
  • 一个消费者,一个消耗者

2、事情行列(公平性)

  • 不必要互换机
  • 一个消费者,多个消耗者,但是一个消息只会发送给一个行列(竞争的消耗者形式)
  • 默许是轮询,即会将消息轮替发给多个消耗者,但如此抵消耗得比力慢的消耗者不公平
  • 可接纳公中分派,即能者多劳channel.basicQos(1);// 限定:发送一条信息给消耗者A,消耗者A未反应处理后果之前,不会再次发送信息给消耗者Aboolean autoAck = false;// 取消主动反应 channel.basicConsume(QUEUE_NAME, autoAck, consumer);// 吸收信息channel.basicAck(envelope.getDeliveryTag(), false);// 反应消息处理终了

3、公布/订阅

  • 一个消费者,多个消耗者
  • 每一个消耗者都有本人的一个行列
  • 消费者没有直接发消息到行列中,而是发送到互换机
  • 每个消耗者的行列都绑定到互换机上
  • 消息经过互换机抵达每个消耗者的行列

该形式就是Fanout Exchange(扇型互换机)将消息路由给绑定到它身上的一切行列

4、路由

消费者发送消息到互换机并指定一个路由key,消耗者行列绑定到互换机时要订定路由key(key婚配就能承受消息,key不婚配就不克不及承受消息)

该形式接纳Direct exchange(直连互换机)

5、主题(通配符)

此形式真实路由key形式的基本上,使用了通配符来办理消耗者吸收消息。消费者P发送消息到互换机X,互换机依据绑定行列的routing key的值举行通配符婚配

标记#:婚配一个大概多个词lazy.# 可以婚配lazy.irs大概lazy.irs.cor

标记*:只能婚配一个词lazy.* 可以婚配lazy.irs大概lazy.cor

该形式接纳Topic exchange(主题互换机)

消息可靠性转达或回退(消费者端)

消费者发送消息出去之后,不晓得毕竟有没有发送到RabbitMQ办事器, 默许是不晓得的。并且有的时分我们在发送消息之后,后方的逻辑出成绩了,我们不想要发送之前的消息了,必要撤回该怎样做。

AMQP 事件机制

  • txSelect 将如今channel设置为transaction形式
  • txCommit 提交如今事件
  • txRollback 事件回滚

Confirm 形式

消息的确认, 是指消费者投递消息后, 假如Broker收到消息, 则会给我们产生一个应对

消费者举行吸收应对, 用来确定这条消息对否正常发送到Broker, 这种办法也是消息的可靠性投递的中心保证

  • 在channel上开启确认形式 : channel.confirmSelect()
  • 在channel上添加监听 : addConfirmListener, 监听告捷和失败的前往后果, 依据具体的后果抵消息举行重新发送, 或纪录日志等后续处理

Return消息机制

Return Listener用于处理一些不成路由的消息

正常情况下消息消费者经过指定一个Exchange和RoutingKey, 把消息送到某一个行列中去, 然后消耗者监听行列, 举行消耗,但在某些情况下, 假如在发送消息的时分, 如今的exchange不存在大概指定的路由key路由不到,这个时分假如我们必要监听这种不成达的消息, 就要使用Return Listener。

在基本API中有一个紧张的设置项Mandatory : 假如为true, 则监听器会吸收到路由不成达的消息, 然后举行后续处理(补偿或人工处理), 假如为false, 那么broker端主动删除该消息。

怎样保证消息可靠转达

  • 保证消息的告捷发射
  • 保证MQ节点的告捷吸收
  • 发送端收到MQ节点(Broker)的确认应对
  • 完满的消息补偿机制

方案:

1、消息落库, 抵消息形态举行标志

  • step1:消息入库
  • step2:消息发送
  • step3:消耗端消息确认
  • step4:更新库中消息形态为已确认
  • step5:定时职责读取数据库中未确认的消息
  • step6:未收到确认后果的消息重新发送
  • step7:假如重试多次之后仍旧失败, 则将消息形态变动为投递失败的终态, 后方必要人工到场

2、消息的延长投递, 做二次确认, 回调反省

  • step1 : 第一次消息发送, 必需业务数据落库之后才干举行消息发送
  • step2 : 第二次消息延长发送, 设定延长一段时间发送第二次check消息
  • step3 : 消耗端监听Broker, 举行消息消耗
  • step4 : 消耗告捷之后, 发送确认消息到确认消息行列
  • step5 : Callback Service监听step4中的确认消息行列, 维护消息形态, 对否消耗告捷等形态
  • step6 : Callback Service监听step2发送的Delay Check的消息行列, 检测内里的消息形态, 假如消息是发送告捷形态, 则流程完毕, 假如消息是失败形态, 大概查不到如今消息形态时, 会关照消费者, 举行消息重发, 重新上述步调

重试机制和幂等性保证(消耗者端)

重试机制

消耗者在消耗消息的时分,假如消耗者业务逻辑显现步骤特别,会使用消息重试机制。

  • 情况1: 消耗者获取到消息后,调用第三方接口,但接口暂且无法拜候,对否必要重试? (必要重试机制)
  • 情况2: 消耗者获取到消息后,抛出数据转换特别,对否必要重试?(不必要重试机制)必要公布举行处理。

关于情况2,假如消耗者代码抛出特别是必要公布新版本才干处理的成绩,那么不必要重试,重试也于事无补。应该接纳日志纪录+定时职责job康健反省+人工举行补偿

重试机制的完成

在SpringBoot中,@RabbitListener(queue="")用于消耗者监听行列。底层使用Aop举行拦阻,假如步骤没有抛出特别,则主动提交事件。假如抛出特别,该消息会缓存到RabbitMQ办事器,主动实行重试机制,不休到告捷为止。可以设置重试距离时间和重试的次数。

幂等性保证

幂等性:多次实行, 后果坚持一律

网络延长传输中,消耗显现特别大概是消耗延长消耗,会形成MQ举行重试补偿,在重试历程中,约莫会形成反复消耗。

处理方案:

  • 唯一ID+指纹码机制唯一ID + 指纹码机制,使用数据库主键去重SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指纹码利益:完成简便坏处:高并发下多数据库写入的功能瓶颈处理方案:跟进ID举行分库分表举行算法路由
  • 使用Redis的原子性去完成在吸收到消息后将消息ID作为key实行 setnx 下令,假如实行告捷就表现没有处理过这条消息,可以举行消耗了,实行失败表现消息以前被消耗了。

主动签收与手动签收(消耗端)

默许是主动签收

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);//关闭主动签收,变为手动签收

channel.basicAck(envelope.getDeliveryTag(), false);// 手工签收, 第二个参数表现对否批量签收

消耗端限流

消息行列中囤积了多量的消息, 大概某些时候消费的消息远宏大于消耗者处理才能的时分, 这个时分假如消耗者一次取出多量的消息, 但是客户端又无法处理, 就会显现成绩, 乃至约莫招致办事崩溃, 以是必要抵消耗端举行限流

RabbitMQ提供了一种qos(办事质量确保)功效, 即在非主动确认消息的条件下, 假如一定数目标消息(经过consumer大概channel设置qos的值)未被确认前, 不举行消耗新的消息

  • 主动签收要设置成false, 发起实践事情中也设置成false
  • void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;prefetchSize : 消息轻重限定, 寻常设置为0, 消耗端不做限定prefetchCount : 会报告RabbitMQ不要同时给一个消耗者推送多于N个消息, 即一旦有N个消息还没有ack, 则该consumer将block(壅闭), 直到有消息ackglobal : true/false 对否将外表设置使用于channel, 简便来说就是外表的限定是channel级别的照旧consumer级别 注意 :

prefetchSize和global这两项,RabbitMQ没有完成,临时不眷注,prefetchCount在autoAck设置false的情况下奏效,即在主动确认的情况下这个值是不奏效的

限流可完成公平行列。

版权声明:本文来自互联网整理发布,如有侵权,联系删除

原文链接:https://www.yigezhs.comhttps://www.yigezhs.com/wangluozixun/37429.html


Copyright © 2021-2022 All Rights Reserved 备案编号:闽ICP备2023009674号 网站地图 联系:dhh0407@outlook.com