RabbitMQ消息中间件知识汇总

Coding Alan 2周前 (08-29) 163次浏览 0个评论

其它主流消息中间件:AcitveMQ(老牌,但性能略差)、Kafka(性能好,可靠性略差)、RocketMQ(收费)

RabbitMQ基于 AMQP 协议实现。适用于对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求次之。

RabbitMQ 整体架构

RabbitMQ消息中间件知识汇总

安装

官网地址:https://www.rabbitmq.com/

Erlang 与 RabbitMQ 对应匹配版本:https://www.rabbitmq.com/which-erlang.html

安装环境:CentOS 7

安装 Erlang

方法一

参考地址:https://github.com/rabbitmq/erlang-rpm

方法二

下载地址:https://www.erlang-solutions.com/resources/download.html

可通过 erl 进入命令行查看

安装 RabbitMQ

以3.6.16为例

注意:如不安装 socat,会报如下错误

配置文件

服务启动/停止

启用插件后在浏览器中访问http://your.ip.addr:15672/,若无法访问请注意检查防火墙相关配置,默认登录用户名/密码: guest/guest

命令行与管控台

消息生产与消费

ConnectionFactory:获取连接工厂

Connection:获取一个连接

Channel:数据通信信道,可发送和接收消息

Queue:具体的消息存储队列

Producer & Consumer:生产和消费者

示例代码:

注:未指定 exchange 时会使用默认AMQP default,但要求队列名称与routing key 相同(示例中为 test001)

Exchange 交换机

接收消息,并根据路由键转发消息所绑定的队列

Name:交换机名称

Type:交换机类型 direct、topic、fanout、headers

  • Direct:RouteKey 需匹配
  • Topic:模糊匹配RouteKey,#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配
  • Fanout:不要处理 RouteKey,只要将简单的队列绑定到交换机上,发送到交换机的消息都会转发到与交换机绑定的队列上;不使用路由规则,因此最快

Durability:是否需要持久化,true 为 持久化

Auto delete:当最后一个绑定到 Exchange 上的队列删除后,自动删除该Exchange

Internal:当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 False

Arguments:扩展参数,用于扩展 AMQP 协议自定义使用

主要概念

Binding-绑定:Exchange 和 Exchange、Queue 之间的连接关系

Queue-消息队列:实际存储消息,Durability设置是否持久化

Message-消息:服务器和应用程序之间传送的数据,由 Properties 和  Payload(Body )组成

  • 常用属性:delivery mode, headers(自定义属性)
  • 其它属性:content_type, content_encoding, priority, correlation_id, reply_to, expiration, message_id, timestamp, type, user_id, app_id, cluster_id

Virtual Host-虚拟主机:逻辑隔离,最上层的消息路由,一个 VirtualHost 中不能有相同名称的 Exchange 或 Queue

属性设置及获取

消息如何保障100%的投递成功?

生产端的可靠性投递

  • 保障消息的成功发出
  • 保障 MQ 节点的成功接收
  • 发送端收到 MQ 节点(Broker)确认应答
  • 完善的消息进行补偿机制

BAT/TMD 互联网大厂的解决方案

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次确认,回调检查
    RabbitMQ消息中间件知识汇总

幂等性

  • 借鉴数据库的乐观锁机制讲解
  • 比如我们执行一条更新库存的 SQL 语句:

幂等性是指执行100次1000次…结果都相同

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

业界主流幂等性操作:

  • 唯一 ID + 指纹码 机制,利用数据库主键去重
    SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一 ID + 指纹码
    实现简单,但高并发下有数据库写入的性能瓶颈;可通过 ID 进行分库分表进行算法路由
  • 利用 Redis 的原子性实现
    如果进行数据落库,如何做到数据库和缓存的原子性?
    若不进行数据落库,如果设置定时同步的策略?

Confirm 确认消息实现

  • 第一步:在 channel 上开启确认模式:channel.confirmSelect()
  • 第二步:在 channel 上添加监听:addConfirmListener

Return 消息机制

Return Listener 用于处理一些不可路由的消息(channel.addReturnListener),channel.basicPublish中的 mandatory 参数应设为 true,默认值 false 会删除找不到路由的消息

消费端自定义监听

消费端限流

RabbitMQ 提供了一种 qos(服务质量保证)功能

void BaiscQos(uint prefetchSize, ushort prefetchCount, bool global);

  • prefetchSize:0为不限制大小
  • prefetchCount:同时给消费推送多少条消息,通常设为1,在autoAck=false 的情况下才生效,即非自动签收
  • global:是否将设置应用于 channel 级别,false 为 consumer 级别

死信队列

DLX, Dead-Letter-Exchange,当消息在一个队列中变成死信后,它能被重新 publish 到另一个 Exchange,这个 Exchange就是 DLX

消息变成死信的几种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue=false
  • 消息 TTL 过期
  • 队列达到最大长度

正常声明交换机、队列、绑定,只需在队列上添加参数 arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);

常见问题

1、ERROR: epmd error for host 10: badarg (unknown POSIX error)

2、{error,{not_a_dets_file,”/var/lib/rabbitmq/mnesia/rabbit@manamana/recovery.dets”}}}

删除/var/lib/rabbitmq/mnesia/rabbit@manamana/recovery.dets文件再重新启动即可

喜欢 (1)
[]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址