其它主流消息中间件:AcitveMQ(老牌,但性能略差)、Kafka(性能好,可靠性略差)、RocketMQ(收费)
RabbitMQ基于 AMQP 协议实现。适用于对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求次之。
RabbitMQ 整体架构
安装
官网地址:https://www.rabbitmq.com/
Erlang 与 RabbitMQ 对应匹配版本:https://www.rabbitmq.com/which-erlang.html
安装环境:CentOS 7
安装 Erlang
方法一
参考地址:https://github.com/rabbitmq/erlang-rpm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# sudo vi /etc/yum.repos.d/rabbitmq_erlang.repo [rabbitmq_erlang] name=rabbitmq_erlang baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch repo_gpgcheck=1 gpgcheck=0 enabled=1 gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 [rabbitmq_erlang-source] name=rabbitmq_erlang-source baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS repo_gpgcheck=1 gpgcheck=0 enabled=1 gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 # 安装 sudo yum install erlang -y |
方法二
下载地址:https://www.erlang-solutions.com/resources/download.html
1 2 3 4 5 6 |
# 安装依赖 sudo yum -y install epel-release sudo yum -y install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_20.3.8.22-1~centos~7_amd64.rpm rpm -ivh esl-erlang_20.3.8.22-1~centos~7_amd64.rpm |
可通过 erl 进入命令行查看
安装 RabbitMQ
以3.6.16为例
1 2 3 |
sudo yum install socat -y # RabbitMQ 需依赖 socat wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_16/rabbitmq-server-3.6.16-1.el7.noarch.rpm sudo rpm -ivh rabbitmq-server-3.6.16-1.el7.noarch.rpm |
注意:如不安装 socat,会报如下错误
1 2 |
error: Failed dependencies: socat is needed by rabbitmq-server-3.6.16-1.el7.noarch |
配置文件
1 2 3 |
# sudo vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.16/ebin/rabbit.app #{loopback_users, [<<"guest">>]},修改为 {loopback_users, [guest]}, |
服务启动/停止
1 2 3 4 5 6 7 |
# 启动服务 sudo rabbitmq-server start& # 或者sudo rabbitmqctl start_app # 验证服务启动:sudo lsof -i:5672 # 停止服务 sudo rabbitmqctl stop_app # 或sudo rabbitmq-server stop # 管理插件 sudo rabbitmq-plugins enable rabbitmq_management # 查看sudo rabbitmq-plugins list |
启用插件后在浏览器中访问http://your.ip.addr:15672/,若无法访问请注意检查防火墙相关配置,默认登录用户名/密码: guest/guest
Docker 安装
1 |
docker run -dit --name some-rabbit --restart=always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management |
命令行与管控台
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# 以下 vhostpath 需替换为虚拟主机路径通过 list_vhosts 命令可进行查看 sudo rabbitmqctl status # 节点状态 sudo rabbitmqctl add_user username password # 添加用户 sudo rabbitmqctl list_users # 列出所有用户 sudo rabbitmqctl delete_user username # 删除用户 sudo rabbitmqctl clear_permissions -p vhostpath username # 清除用户权限 sudo rabbitmqctl list_user_permissions username # 列出用户权限 sudo rabbitmqctl change_password username newpassword # 修改密码 sudo rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*" # 设置用户权限 sudo rabbitmqctl add_vhost vhostpath # 创建虚拟主机 sudo rabbitmqctl list_vhosts # 列出所有虚拟主机 sudo rabbitmqctl list_permissions -p vhostpath # 列出虚拟主机上所有权限 sudo rabbitmqctl delete_vhost vhostpath # 删除虚拟主机 sudo rabbitmqctl list_queues # 查看所有队列信息 sudo rabbitmqctl -p vhostpath purge_queue blue # 清除队列里的消息 sudo rabbitmqctl reset # 移除所有数据,要在 rabbitmqctl stop_app之后使用 sudo rabbitmqctl join_cluster <clusternode> [--ram|disc] # 组成集群命令 sudo rabbitmqctl cluster_status # 查看集群状态 sudo rabbitmqctl change_cluster_node_type disc|ram # 修改集群节点的存储形式 sudo rabbitmqctl forget_cluster_node [--offline] # 忘记节点(摘除节点) sudo rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2...] # 修改节点名称 |
消息生产与消费
ConnectionFactory:获取连接工厂
Connection:获取一个连接
Channel:数据通信信道,可发送和接收消息
Queue:具体的消息存储队列
Producer & Consumer:生产和消费者
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public class Producer { public static void main(String[] args) throws Exception { // 1、创建一个 ConnectionFactory并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.xxx.xxx"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); // 2、通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); // 3、通过 connection 创建一个 Channel Channel channel = connection.createChannel(); // 4、通过 channel 发送数据 for(int i=0; i<5; i++) { String msg = "Hello RabbitMQ"; channel.basicPublish("", "test001", null, msg.getBytes()); } // 5、关闭相关连接 channel.close(); connection.close(); } # 消费者 public class Consumer { public static void main(String[] args) throws Exception { ...1,2,3步代码与 Producer 相同,此处省略.... // 4、声明一个队列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); // 5、创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); // 6、设置 Channel // channel.queueBind(queueName,"test_consumer_exchange","topic"); channel.basicConsume(queueName, true, queueingConsumer); while(true) { // 7、获取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("消费值:"+msg); // Envelope envelope = delivery.getEnvelope(); } } } |
注:未指定 exchange 时会使用默认AMQP default,但要求队列名称与routing key 相同(示例中为 test001)
Exchange 交换机
接收消息,并根据路由键转发消息所绑定的队列
Name:交换机名称
Type:交换机类型 direct、topic、fanout、headers
- Direct:RouteKey 需匹配
12345678910111213141516# ProducerString exchangeName = "test_direct_exchange";String routingKey = "test.direct";String msg = "Hello World RabbitMQ 4 Direct Exchnage Message...";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());# ConsumerString exchangeName ="test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer); - Topic:模糊匹配RouteKey,#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配
1234567891011121314151617181920212223242526272829# ProducerString exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//发送String msg = "Hello World RabbitMQ 4 Topic Exchnage Message...";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());// 5、关闭相关连接channel.close();connection.close();# ConsumerString exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);while(true) {Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:"+msg);} - Fanout:不要处理 RouteKey,只要将简单的队列绑定到交换机上,发送到交换机的消息都会转发到与交换机绑定的队列上;不使用路由规则,因此最快
12345678910111213141516171819202122232425# ConsumerString exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_fanout_queue";String routingKey = ""; // 不设置路由键channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);while(true) {Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:"+msg);}# ProducerString exchangeName = "test_fanout_exchange";String routingKey = "xxxxx"; // 设置与不设置均不产生影响//发送String msg = "Hello World RabbitMQ 4 FANOUT Exchnage Message...";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());// 5、关闭相关连接channel.close();connection.close();
Durability:是否需要持久化,true 为 持久化
Auto delete:当最后一个绑定到 Exchange 上的队列删除后,自动删除该Exchange
Internal:当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 False
Arguments:扩展参数,用于扩展 AMQP 协议自定义使用
主要概念
Binding-绑定:Exchange 和 Exchange、Queue 之间的连接关系
Queue-消息队列:实际存储消息,Durability设置是否持久化
Message-消息:服务器和应用程序之间传送的数据,由 Properties 和 Payload(Body )组成
- 常用属性:delivery mode, headers(自定义属性)
- 其它属性:content_type, content_encoding, priority, correlation_id, reply_to, expiration, message_id, timestamp, type, user_id, app_id, cluster_id
Virtual Host-虚拟主机:逻辑隔离,最上层的消息路由,一个 VirtualHost 中不能有相同名称的 Exchange 或 Queue
属性设置及获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# Producer Map<String, Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .headers(headers) .build(); for(int i=0; i<5; i++) { String msg = "Hello RabbitMQ"; channel.basicPublish("", "test001", properties, msg.getBytes()); } # Consumer Map<String, Object> headers =delivery.getProperties().getHeaders(); System.err.println("Header get my1 value:" + headers.get("my1")); |
消息如何保障100%的投递成功?
生产端的可靠性投递
- 保障消息的成功发出
- 保障 MQ 节点的成功接收
- 发送端收到 MQ 节点(Broker)确认应答
- 完善的消息进行补偿机制
BAT/TMD 互联网大厂的解决方案
幂等性
- 借鉴数据库的乐观锁机制讲解
- 比如我们执行一条更新库存的 SQL 语句:
123UPDATE T_REPS SET COUNT = COUNT - 1,VERSTION = VERSION + 1WHERE VERSION = 1
幂等性是指执行100次1000次…结果都相同
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
业界主流幂等性操作:
- 唯一 ID + 指纹码 机制,利用数据库主键去重
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一 ID + 指纹码
实现简单,但高并发下有数据库写入的性能瓶颈;可通过 ID 进行分库分表进行算法路由 - 利用 Redis 的原子性实现
如果进行数据落库,如何做到数据库和缓存的原子性?
若不进行数据落库,如果设置定时同步的策略?
Confirm 确认消息实现
- 第一步:在 channel 上开启确认模式:channel.confirmSelect()
- 第二步:在 channel 上添加监听:addConfirmListener
Return 消息机制
Return Listener 用于处理一些不可路由的消息(channel.addReturnListener),channel.basicPublish中的 mandatory 参数应设为 true,默认值 false 会删除找不到路由的消息
消费端自定义监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# 生产中不使用 queueingConsumer 而是会去自定义,Consumer 类的修改为如下 channel.basicConsume(queueName, true, new MyConsumer(channel)); # 新增MyConsumer的实现 public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ... ... } } |
消费端限流
RabbitMQ 提供了一种 qos(服务质量保证)功能
void BaiscQos(uint prefetchSize, ushort prefetchCount, bool global);
- prefetchSize:0为不限制大小
- prefetchCount:同时给消费推送多少条消息,通常设为1,在autoAck=false 的情况下才生效,即非自动签收
- global:是否将设置应用于 channel 级别,false 为 consumer 级别
死信队列
DLX, Dead-Letter-Exchange,当消息在一个队列中变成死信后,它能被重新 publish 到另一个 Exchange,这个 Exchange就是 DLX
消息变成死信的几种情况:
- 消息被拒绝(basic.reject / basic.nack),并且requeue=false
- 消息 TTL 过期
- 队列达到最大长度
正常声明交换机、队列、绑定,只需在队列上添加参数 arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);
6种模式汇总(演示代码 Golang)
https://www.rabbitmq.com/getstarted.html
Simple简单模式
最简单常用的模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) // 格式:amqp://账号:密码@服务器地址:端口号/vhost const MQURL = "amqp://demo:demopass@127.0.0.1:5672/demo" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel // 队列名称 QueueName string // 交换机 Exchange string Key string // 连接信息 Mqurl string } // 创建 RabbitMQ 结构体实例 func NewRabbitMQ(queueName string, exhange string, key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName:queueName, Exchange:exhange, Key:key, Mqurl:MQURL} var err error // 创建RabbitMQ连接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnError(err, "创建连接错误") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnError(err, "获取channel失败") return rabbitmq } // 断开连接 func (r *RabbitMQ) Destroy() { r.channel.Close() r.conn.Close() } // 错误处理函数 func (r *RabbitMQ) failOnError(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } } // 简单模式 Step 1、创建简单模式RabbitMQ 实例 func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") } // 简单模式 Step 2、简单模式生产代码 func (r *RabbitMQ) PublishSimple(message string) { // 1、申请队列,如果不存在自动创建,存在则跳过 _, err := r.channel.QueueDeclare( r.QueueName, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 false, // 额外属性 nil) if err != nil { fmt.Println(err) } // 2、发送消息到队列中 r.channel.Publish( r.Exchange, r.QueueName, // 若为true,根据Exchange类型和routingKey规则,无法找到满足条件的队列则将消息返还发送者 false, // 若为true,队列未绑定消费者时将消息返还发送者 false, amqp.Publishing{ ContentType:"text/plain", Body:[]byte(message), }) } func (r *RabbitMQ) ConsumeSimple() { // 1、申请队列,如果不存在自动创建,存在则跳过 _, err := r.channel.QueueDeclare( r.QueueName, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 false, // 额外属性 nil) if err != nil { fmt.Println(err) } // 接收消息 msgs, err := r.channel.Consume( r.QueueName, // 用于区分消息费 "", // 是否自动应答 true, // 是否具有排他性 false, // 若为true,不能将消息发给同一个连接中的消费者 false, // 消费是否阻塞 false, // 额外属性 nil) if err != nil { fmt.Println(err) } forever := make(chan bool) // 启用协程处理消息 go func() { for d := range msgs { // 实现处理逻辑函数 log.Printf("Received a message: %s", d.Body) } }() log.Print("[*] Waiting for messages, press Ctrl+C to exit") <- forever } |
生产者调用:
1 2 3 4 5 |
func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("demoSimple") rabbitmq.PublishSimple("Hello world!") fmt.Println("发送成功") } |
消费者调用:
1 2 3 4 |
func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("demoSimple") rabbitmq.ConsumeSimple() } |
Work 工作模式
一条消息只能被一个消费者获取
生产者调用:
1 2 3 4 5 6 7 8 |
func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("demoSimple") for i := 0; i <= 100; i++ { rabbitmq.PublishSimple("Hello world!") time.Sleep(1 * time.Second) fmt.Println("发送成功") } } |
消费者调用(起多个服务):
1 2 3 4 |
func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("demoSimple") rabbitmq.ConsumeSimple() } |
Publish/Subscribe 订阅模式
消息被路由投递给多个队列,一个消息被多个消费者获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
// 订阅模式创建RabbitMQ实例 func NewRabbitMQPubSub(exhangeName string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exhangeName, "") var err error // 获取 connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnError(err, "创建连接错误") // 获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnError(err, "获取channel失败") return rabbitmq } // 订阅模式生产 func (r *RabbitMQ) PublishPub(message string) { // 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, "fanout", true, false, // true表示这个exchange不能被client用来推送消息,仅用于exchange之间的绑定 false, false, nil) r.failOnError(err, "Failed to declare an exchange") // 发送消息 err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ ContentType:"text/plain", Body:[]byte(message), }) } // 订阅模式消费端代码 func (r *RabbitMQ) ReceiveSub() { // 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机类型 "fanout", true, false, false, false, nil) r.failOnError(err, "Failed to declare an exchange") // 尝试创建队列,这里队列名称不要写 q, err := r.channel.QueueDeclare( "", // 随机生产队列名称 false, false, true, false, nil) r.failOnError(err, "Failed to declare a queue") // 绑定队列到exchange中 err = r.channel.QueueBind( q.Name, // pub/sub模式下,key应为空 "", r.Exchange, false, nil) // 消费消息 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil) forever := make(chan bool) go func() { for d := range messages { log.Printf("Received a message: %s", d.Body) } }() log.Print("[*] Waiting for messages, press Ctrl+C to exit") <- forever } |
生产者调用:
1 2 3 4 5 6 7 8 |
func main() { rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct") for i := 0; i< 100; i++ { rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据") fmt.Println("订阅模式生产第" + strconv.Itoa(i) + "条数据") time.Sleep(1 * time.Second) } } |
消费者调用(起多个服务):
1 2 3 4 |
func main() { rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct") rabbitmq.ReceiveSub() } |
Routing路由模式
一个消息被多个消费者获取。并且消息的目标队列可被生产者指定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
// 路由模式创建RabbitMQ实例 func NewRabbitMQRouting(exhangeName string, routingKey string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exhangeName, routingKey) var err error // 获取 connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnError(err, "创建连接错误") // 获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnError(err, "获取channel失败") return rabbitmq } // 路由模式生产 func (r *RabbitMQ) PublishRouting(message string) { // 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 修改为direct "direct", true, false, // true表示这个exchange不能被client用来推送消息,仅用于exchange之间的绑定 false, false, nil) r.failOnError(err, "Failed to declare an exchange") // 发送消息 err = r.channel.Publish( r.Exchange, r.Key, false, false, amqp.Publishing{ ContentType:"text/plain", Body:[]byte(message), }) } // 路由模式消费端代码 func (r *RabbitMQ) ReceiveRouting() { // 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机类型 "direct", true, false, false, false, nil) r.failOnError(err, "Failed to declare an exchange") // 尝试创建队列,这里队列名称不要写 q, err := r.channel.QueueDeclare( "", // 随机生产队列名称 false, false, true, false, nil) r.failOnError(err, "Failed to declare a queue") // 绑定队列到exchange中 err = r.channel.QueueBind( q.Name, // 需要绑定key r.Key, r.Exchange, false, nil) // 消费消息 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil) forever := make(chan bool) go func() { for d := range messages { log.Printf("Received a message: %s", d.Body) } }() log.Print("[*] Waiting for messages, press Ctrl+C to exit") <- forever } |
生产者调用:
1 2 3 4 5 6 7 8 9 10 |
func main() { demoOne := RabbitMQ.NewRabbitMQRouting("demoEx", "demo_one") demoTwo := RabbitMQ.NewRabbitMQRouting("demoEx", "demo_two") for i:=0; i<=100; i++ { demoOne.PublishRouting("Hello demo one!" + strconv.Itoa(i)) demoTwo.PublishRouting("Hello demo two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } } |
消费者调用(起多个服务):
1 2 3 4 5 |
// receiveRoutingOne.go func main() { demoOne := RabbitMQ.NewRabbitMQRouting("demoEx", "demo_one") demoOne.ReceiveRouting() } |
1 2 3 4 5 |
// receiveRoutingTwo.go func main() { demoTwo := RabbitMQ.NewRabbitMQRouting("demoEx", "demo_two") demoTwo.ReceiveRouting() } |
Topic话题模式
一个消息被多个消费者获取。消息的目标队列可用BindingKey以通配符(#:一个或多个词,*:一个词)的方式指定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
// 话题模式创建RabbitMQ实例 func NewRabbitMQTopic(exhangeName string, routingKey string) *RabbitMQ { rabbitmq := NewRabbitMQ("", exhangeName, routingKey) var err error // 获取 connection rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnError(err, "创建连接错误") // 获取channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnError(err, "获取channel失败") return rabbitmq } // 话题模式生产 func (r *RabbitMQ) PublishTopic(message string) { // 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 修改为topic "topic", true, false, // true表示这个exchange不能被client用来推送消息,仅用于exchange之间的绑定 false, false, nil) r.failOnError(err, "Failed to declare an exchange") // 发送消息 err = r.channel.Publish( r.Exchange, r.Key, false, false, amqp.Publishing{ ContentType:"text/plain", Body:[]byte(message), }) } // 话题模式消费端代码 func (r *RabbitMQ) ReceiveTopic() { // 尝试创建交换机 err := r.channel.ExchangeDeclare( r.Exchange, // 交换机类型 "topic", true, false, false, false, nil) r.failOnError(err, "Failed to declare an exchange") // 尝试创建队列,这里队列名称不要写 q, err := r.channel.QueueDeclare( "", // 随机生产队列名称 false, false, true, false, nil) r.failOnError(err, "Failed to declare a queue") // 绑定队列到exchange中 err = r.channel.QueueBind( q.Name, // 需要绑定key r.Key, r.Exchange, false, nil) // 消费消息 messages, err := r.channel.Consume( q.Name, "", true, false, false, false, nil) forever := make(chan bool) go func() { for d := range messages { log.Printf("Received a message: %s", d.Body) } }() log.Print("[*] Waiting for messages, press Ctrl+C to exit") <- forever } |
生产者调用:
1 2 3 4 5 6 7 8 9 10 |
func main() { demoOne := RabbitMQ.NewRabbitMQTopic("exDemoTopic", "demo.topic.one") demoTwo := RabbitMQ.NewRabbitMQTopic("exDemoTopic", "demo.topic.two") for i := 0; i <= 10; i++ { demoOne.PublishTopic("Hello demo topic one!" + strconv.Itoa(i)) demoTwo.PublishTopic("Hello demo topic two!" + strconv.Itoa(i)) time.Sleep(1 * time.Second) fmt.Println(i) } } |
消费者调用(起多个服务):
1 2 3 4 5 |
// receiveDemoTopicAll.go func main() { demoOne := RabbitMQ.NewRabbitMQTopic("exDemoTopic", "#") demoOne.ReceiveTopic() } |
1 2 3 4 5 |
// receiveDemoTopicTwo.go func main() { demoTwo := RabbitMQ.NewRabbitMQTopic("exDemoTopic", "demo.*.two") demoTwo.ReceiveTopic() } |
RPC模式
常见问题
1、ERROR: epmd error for host 10: badarg (unknown POSIX error)
1 2 3 4 |
# sudo vi /etc/rabbitmq/rabbitmq-env.conf NODENAME=rabbit@localhost sudo vi /etc/hostname # 修改主机名并重启 |
2、{error,{not_a_dets_file,”/var/lib/rabbitmq/mnesia/rabbit@manamana/recovery.dets”}}}
删除/var/lib/rabbitmq/mnesia/rabbit@manamana/recovery.dets文件再重新启动即可