• 三、RabbitMQ消息的可靠投递


    如何保证消息不丢失

    在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:

    在这里插入图片描述
    消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c)。

    发送端的确认

    Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式
    和return 退回模式。

    confirm 确认模式:消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

    return 退回模式:消息从 exchange–>queue 投递失败,会将消息退回给producer。

    消费端的确认

    消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack,有两种确认方式:自动确认和手动确认。

    在编码中,关于消息的确认方式,我们需要在消费者端调用Consumer函数时,设置第三个参数:autoAck是false还是true(false表示手动,true表示自动)。

    自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
    但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false),手动签收,如果出现异常,则调用d.Reject(true)让其自动重新发送消息。

    Go 实现

    安装操作库

    安装API库

    Go可以使用streadway/amqp库来操作rabbit,使用以下命令来安装:

    go get github.com/streadway/amqp
    
    • 1

    封装rabbitmq

    接下来我们对streadway/amqp库的内容进行一个二次封装,封装为一个rabbitmq.go文件:

    package rabbitmq
    
    import (
    	"encoding/json"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    // RabbitMQ RabbitMQ结构
    type RabbitMQ struct {
    	channel  *amqp.Channel
    	Name     string
    	exchange string
    }
    
    // Connect 连接服务器
    func Connect(s string) *RabbitMQ {
    	//连接rabbitmq
    	conn, e := amqp.Dial(s)
    	failOnError(e, "连接Rabbitmq服务器失败!")
    	ch, e := conn.Channel()
    	failOnError(e, "无法打开频道!")
    	mq := new(RabbitMQ)
    	mq.channel = ch
    	return mq
    }
    
    // New 初始化消息队列
    //第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
    func New(s string, name string) *RabbitMQ {
    	//连接rabbitmq
    	conn, e := amqp.Dial(s)
    	failOnError(e, "连接Rabbitmq服务器失败!")
    	ch, e := conn.Channel()
    	failOnError(e, "无法打开频道!")
    	q, e := ch.QueueDeclare(
    		name,  //队列名
    		false, //是否开启持久化
    		true,  //不使用时删除
    		false, //排他
    		false, //不等待
    		nil,   //参数
    	)
    	failOnError(e, "初始化消息队列失败!")
    
    	mq := new(RabbitMQ)
    	mq.channel = ch
    	mq.Name = q.Name
    	return mq
    }
    
    // QueueDeclare 声明queue
    func (q *RabbitMQ) QueueDeclare(queue string) {
    	_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
    	failOnError(e, "声明queue失败!")
    }
    
    // QueueDelete 删除queue
    func (q *RabbitMQ) QueueDelete(queue string) {
    	_, e := q.channel.QueueDelete(queue, false, true, false)
    	failOnError(e, "删除queue失败!")
    }
    
    // Qos 配置queue参数
    func (q *RabbitMQ) Qos() {
    	e := q.channel.Qos(1, 0, false)
    	failOnError(e, "无法设置QoS")
    }
    
    // NewExchange 初始化交换机
    //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
    func NewExchange(s string, name string, typename string) {
    	//连接rabbitmq
    	conn, e := amqp.Dial(s)
    	failOnError(e, "连接Rabbitmq服务器失败!")
    	ch, e := conn.Channel()
    	failOnError(e, "无法打开频道!")
    	e = ch.ExchangeDeclare(
    		name,     // name
    		typename, // type
    		true,     // durable
    		false,    // auto-deleted
    		false,    // internal
    		false,    // no-wait
    		nil,      // arguments
    	)
    	failOnError(e, "初始化交换机失败!")
    
    }
    
    // ExchangeDelete 删除交换机
    func (q *RabbitMQ) ExchangeDelete(exchange string) {
    	e := q.channel.ExchangeDelete(exchange, false, true)
    	failOnError(e, "删除交换机失败!")
    }
    
    // Bind 绑定消息队列到exchange
    func (q *RabbitMQ) Bind(exchange string, key string) {
    	e := q.channel.QueueBind(
    		q.Name,
    		key,
    		exchange,
    		false,
    		nil,
    	)
    	failOnError(e, "绑定队列失败!")
    	q.exchange = exchange
    }
    
    // Send 向消息队列发送消息
    //Send方法可以往某个消息队列发送消息
    func (q *RabbitMQ) Send(queue string, body interface{}) {
    	str, e := json.Marshal(body)
    	failOnError(e, "消息序列化失败!")
    	e = q.channel.Publish(
    		"",    //交换
    		queue, //路由键
    		false, //必填
    		false, //立即
    		amqp.Publishing{
    			ReplyTo: q.Name,
    			Body:    []byte(str),
    		})
    	msg := "向队列:" + q.Name + "发送消息失败!"
    	failOnError(e, msg)
    }
    
    // Publish 向exchange发送消息
    //Publish方法可以往某个exchange发送消息
    func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
    	str, e := json.Marshal(body)
    	failOnError(e, "消息序列化失败!")
    	e = q.channel.Publish(
    		exchange,
    		key,
    		false,
    		false,
    		amqp.Publishing{ReplyTo: q.Name,
    			Body: []byte(str)},
    	)
    	failOnError(e, "向交换机发送消息失败!")
    }
    
    // Consume 接收某个消息队列的消息
    func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
    	c, e := q.channel.Consume(
    		q.Name, //指定从哪个队列中接收消息
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(e, "接收消息失败!")
    	return c
    }
    
    // Close 关闭队列连接
    func (q *RabbitMQ) Close() {
    	q.channel.Close()
    }
    
    //错误处理函数
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170

    发送端的确认

    首先初始化消息队列的时候,我们要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

    func New(s string, name string) *RabbitMQ {
    	conn, e := amqp.Dial(s)
    	failOnError(e, "连接Rabbitmq服务器失败!")
    	ch, e := conn.Channel()
    	failOnError(e, "无法打开频道!")
    	q, e := ch.QueueDeclare(
    		name,  //队列名
    		false, //是否开启持久化
    		true,  //不使用时删除
    		false, //排他
    		false, //不等待
    		nil,   //参数
    	)
    	failOnError(e, "初始化消息队列失败!")
    
    	mq := new(RabbitMQ)
    	mq.channel = ch
    	mq.Name = q.Name
    	// 设置为confirm模式
    	mq.channel.Confirm(false)
    	return mq
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    然后在封装库中创建一个函数handleConfirm()用于接收来自Borker的回复:

    func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
    	return q.channel.NotifyPublish(ch)
    }
    
    • 1
    • 2
    • 3

    生产者

    生产者端在向Broker发送消息的时候,我们使用一个无缓冲的通道来接收来自Broker的回复,然后创建一个协程监听这个无缓冲通道。

    func main() {
    	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
    	// 指定为topic类型
    	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
    	confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
    	go handleConfirm(confirm)
    	var i int
    	for {
    		time.Sleep(time.Second)
    		producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
    		i++
    	}
    }
    
    func handleConfirm(confirm <-chan amqp.Confirmation) {
    	for {
    		select {
    		case message := <-confirm:
    			fmt.Println("接收到来自Broker的回复:", message)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    运行结果:

    接收到来自Broker的回复: {1 true}
    接收到来自Broker的回复: {2 true}
    接收到来自Broker的回复: {3 true}
    接收到来自Broker的回复: {4 true}
    接收到来自Broker的回复: {5 true}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消费端的确认

    首先将Consume函数的第三个参数autoAck参数标记为false:

    // Consume 接收某个消息队列的消息
    func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
    	c, e := q.channel.Consume(
    		q.Name,
    		"",
    		false, // 不自动确认消息
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(e, "接收消息失败!")
    	return c
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在消费者端我们采用公平派遣模式,即队列发送消息给消费者的时候,不再采用轮询机制,而是一个消费者消费完消息之后,会调用Ack(false)函数向队列发送一个回复,队列每次会将消息优先发送给消费完消息的消费者(回复过)。

    消费端限流:
    实现公平派遣模式我们需要设置消费者端一次只能消费一条消息,之前我们已经进行了封装,直接在消费者端调用即可:

    // Qos 配置queue参数
    func (q *RabbitMQ) Qos() {
    	e := q.channel.Qos(1, 0, false)
    	failOnError(e, "无法设置QoS")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    生产者

    func main() {
    	producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
    	// 指定为direct类型
    	rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
    	i := 0
    	for {
    		time.Sleep(time.Second)
    		producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
    		i = i + 1
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    消费者1

    消费者2在消费第三条消息的时候,假设发生了错误,我们调用d.Reject(true)函数让队列重新发送消息。

    func main() {
    	//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
    	consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
    	// 指定一次只消费一条消息,直到消费完才重新接收
    	consumer1.Qos()
    	// 队列绑定到exchange
    	consumer1.Bind("exchange", "key1")
    	//接收消息
    	msgs := consumer1.Consume()
    	go func() {
    		var i int
    		for d := range msgs {
    			time.Sleep(time.Second * 1)
    			log.Printf("Consumer1 received a message: %s", d.Body)
    			// 假设消费第三条消息的时候出现了错误,我们就调用d.Reject(true),队列会重新发送消息给消费者
    			if i == 2 {
    				d.Reject(true)
    			} else {
    				// 消息消费成功之后就回复
    				d.Ack(false)
    			}
    			i++
    		}
    	}()
    	select {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    消费者2

    func main() {
    	//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
    	consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
    	// 指定一次只消费一条消息,直到消费完才重新接收
    	consumer2.Qos()
    	// 队列绑定到exchange
    	consumer2.Bind("exchange", "key1")
    	//接收消息
    	msgs := consumer2.Consume()
    	go func() {
    		for d := range msgs {
    			time.Sleep(time.Second * 5)
    			log.Printf("Consumer2 received a message: %s", d.Body)
    			// 消息消费成功之后就回复
    			d.Ack(false)
    		}
    	}()
    	select {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    运行结果:

    # 消费者1
    2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
    2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
    2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
    2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
    2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
    2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"
    
    # 消费者2
    2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    Array题型之双指针(TwoPointers) [leetcode][数据结构]
    大数据NoSQL数据库HBase集群部署
    Python输出当前路径
    C++ 移动构造函数详解
    Java代码审计sql注入基础
    React中组件通信02——消息订阅与发布、取消订阅以及卸载组件时取消订阅
    【深基13.例1】查找
    Python之Python的版本选择和IDE工具选择问题
    webgoat-(A1)SQL Injection
    写一个flutter程序2
  • 原文地址:https://blog.csdn.net/qq_49723651/article/details/127717187