• RabbitMQ


    MQ概念

    MQ全称为Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
    在这里插入图片描述

    MQ的优势劣势

    MQ的优势:
    1.应用解耦
    在这里插入图片描述
    2.异步提速
    在这里插入图片描述

    3.削峰填谷

    MQ的劣势:
    1.系统可用性降低
    2.系统复杂性提高
    3.一致性问题

    在这里插入图片描述

    使用MQ要满足的条件:

    1.生产者不需要从消费者处得到反馈,引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
    2.容许短暂的不一致性
    3.确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

    常见的MQ产品:RabbitMQ、RocketMQ、Kafka等,也可以直接使用Redis充当消息队列。

    RabbitMQ简介

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)(高级消息队列协议),是一个网络协议,是应用层协议的一个开发标准,为面向消息的中间件设计。
    在这里插入图片描述
    RabbitMQ基础架构图如下图:
    在这里插入图片描述
    Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker

    Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

    Connection:publisher/consumer和broker之间的TCP连接

    Channel:如果每一次访问RabbitMQ都创建一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也很低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP methods包括了channel id帮助客户端和message broker识别channel,所有channel之间是完全隔离的。channel作为轻量级Connection极大减少了操作系统建立TCP connection的开销。

    Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct,topic and fanout

    Queue:消息最终被送到这里等待consumer取走

    Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

    rabbitmq的运行流程:
    在这里插入图片描述

    RabbitMQ工作模式:

    RabbitMQ的6种工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式。

    简单模式:
    在这里插入图片描述

    Work queues工作队列模式:
    在这里插入图片描述

    Pub/Sub订阅模式:

    在这里插入图片描述
    路由模式:
    在这里插入图片描述
    路由模式:

    1、每个消费者监听自己的队列,并且设置routingkey。
    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
    
    • 1
    • 2

    Topics通配符模式:

    在这里插入图片描述

    RabbitMQ快速入门

    docker安装rabbitmq

    docker pull rabbitmq:management
    
    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost  -p 15672:15672 -p 5672:5672  rabbitmq:management
    
    docker exec -it rabbit /bin/bash
    
    rabbitmq-plugins enable rabbitmq_management
    #安装web管理插件
    
    rabbitmqctl set_vm_memory_high_watermark <fraction>
    rabbitmqctl set_vm_memory_high_watermark absolute 50MB
    #调整分给rabbitmq的内存
    
    vm_memory_high_watermark.relative = 0.4
    vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)
    #通过设置上述属性来修改超过最大内存多少百分比时进行换页
    
    rabbitmqctl set_disk_free_limit  <disk_limit>
    rabbitmqctl set_disk_free_limit memory_limit  <fraction>
    #disk_limit:固定单位 KB MB GB
    #fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)
    #修改磁盘超过多少时进行预警,这也会阻塞生产者
    
    disk_free_limit.relative = 3.0
    disk_free_limit.absolute = 50mb
    #修改设置文件中的属性也能起到同样效果
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    rabbitmq相关命令:

    #创建账号
    rabbitmqctl add_user admin 123
    #设置用户角色
    rabbitmqctl set_user_tags admin administrator
    #设置用户权限
    set_permissions [-p <vhostpath>]<user> <conf> <write> <read>下·
    rabbitmqctl set_permissions -p "/" admin ".*""_*"*"
    #用户admin具有/vhost1这个virtual host 中所有资源的配置、写、读权限当前用户和角色
    rabbitmqctl list_users
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    生产者:

    func Publish() {
    	conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/my_vhost")
    	//建立连接
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	ch, err := conn.Channel()
    	//建立一个channel(其实就是一个TCP连接)
    	if err != nil {
    		panic(err)
    	}
    	defer ch.Close()
    	q, err := ch.QueueDeclare(
    		"hello",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    	//参数分别为:
    	//1.队列的名称
    	//2.是否持久化,当mq重启之后,还在
    	//3.是否自动删除,当连接此队列的所有客户端断开时,是否删除队列
    	//4.是否是内置交互器,(只能通过交换器将消息路由到此交互器,不能通过客户端发送消息
    	//5.是否阻塞处理
    	//6.配置的参数信息
    	if err != nil {
    		panic(err)
    	}
    	body := "Hello World!"
    	msg := amqp.Publishing{
    		ContentType:  "text/plain",
    		DeliveryMode: 1, //1代表消息持久化,2是不持久化
    		Body:         []byte(body),
    	}
    	//创建一个消息
    	err = ch.Publish(
    		"",
    		q.Name,
    		false,
    		false,
    		msg)
    	//构建一个生产者,将消息放入队列,参数为:
    	//1.交换机名称
    	//2.RouterKey,用于绑定的路由键
    	//3.是否为无法路由的消息进行返回处理
    	//4.是否对路由到无消费者队列的消息进行返回处理RabbitMQ3.0废弃
    	//5.消息体
    	if err != nil {
    		panic(err)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    消费者:

    func Consumer() {
    	conn, err := amqp.Dial("amqp://admin:admin123@localhost:5672/my_vhost")
    	//建立连接
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	ch, err := conn.Channel()
    	//建立一个channel(其实就是一个TCP连接)
    	if err != nil {
    		panic(err)
    	}
    	defer ch.Close()
    	q, err := ch.QueueDeclare(
    		"hello",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    	//参数分别为:
    	//1.队列的名称
    	//2.是否持久化,当mq重启之后,还在
    	//3.是否自动删除,当连接此队列的所有客户端断开时,是否删除队列
    	//4.是否独占连接,只能有一个消费者独占这个队列,当connection关闭时,是否删除队列
    	//5.是否阻塞处理
    	//6.配置的参数信息
    	if err != nil {
    		panic(err)
    	}
    
    	msgChan, err := ch.Consume(
    		q.Name,
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil)
    	//推模式消费:RabbitMQ将消息推给消费者
    	//构建一个消费者,消费队列中的消息,参数为:
    	//1.队列名称
    	//2.消费者名称
    	//3.是否确认消费
    	//4.排他
    	//5.noLocal:设置为true表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
    	//6.等待服务器的确认
    	//7.配置的参数信息
    
    	ch.Get(
    		q.Name,
    		true)
    	//拉模式消费:消费者主动拉取RabbitMQ中的消息
    	//第一个参数为队列名称,第二个参数为是否需要等待服务器的确认
    	if err != nil {
    		panic(err)
    	}
    	for msg := range msgChan {
    		fmt.Println("收到消息:", string(msg.Body))
    		ch.Ack(msg.DeliveryTag, false)
    	}
    	//1.消息标记tag
    	//2.false代表只应答接收到的那个传递的消息 true为应答所有消息包括传递过来的消息
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    发布订阅模式:

    func Sub_Publish() {
    	conn, err := amqp.Dial("amqp://username:password@host:5672//vhost")
    	//建立连接
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	ch, err := conn.Channel()
    	//建立一个channel(其实就是一个TCP连接)
    	if err != nil {
    		panic(err)
    	}
    	defer ch.Close()
    
    	// 创建交换机
    	if err := ch.ExchangeDeclare(
    		"exchange",          // name
    		amqp.ExchangeFanout, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		panic(err)
    	}
    
    	q1, err := ch.QueueDeclare(
    		"queue1",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	q2, err := ch.QueueDeclare(
    		"queue2",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	if err := ch.QueueBind(q1.Name, "", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    	if err := ch.QueueBind(q2.Name, "", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    	//参数分别为:
    	//1.队列的名称
    	//2.router key
    	//3.交换机的名称
    	//4.等待服务器确认
    	//5.配置的参数信息
    	if err != nil {
    		panic(err)
    	}
    	body := "Hello World!"
    	msg := amqp.Publishing{
    		ContentType: "text/plain",
    		Body:        []byte(body),
    	}
    	err = ch.Publish(
    		"exchange",
    		"",
    		false,
    		false,
    		msg)
    	//构建一个生产者,将消息放入队列,参数为:
    	//1.交换机名称
    	//2.RouterKey,即队列的名称
    	//3.是否为无法路由的消息进行返回处理
    	//4.是否对路由到无消费者队列的消息进行返回处理RabbitMQ3.0废弃
    	//5.消息体
    	if err != nil {
    		panic(err)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    路由模式:

    func Route_Publish() {
    	conn, err := amqp.Dial("amqp://username:password@host:5672//vhost")
    	//建立连接
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	ch, err := conn.Channel()
    	//建立一个channel(其实就是一个TCP连接)
    	if err != nil {
    		panic(err)
    	}
    	defer ch.Close()
    
    	// 创建交换机
    	if err := ch.ExchangeDeclare(
    		"exchange",          // name
    		amqp.ExchangeTopic, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		panic(err)
    	}
    
    	q1, err := ch.QueueDeclare(
    		"queue1",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	q2, err := ch.QueueDeclare(
    		"queue2",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	if err := ch.QueueBind(q1.Name, "info", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    	if err := ch.QueueBind(q2.Name, "error", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    	//参数分别为:
    	//1.队列的名称
    	//2.用于绑定的路由键
    	//3.交换机的名称
    	//4.等待服务器确认
    	//5.配置的参数信息
    	if err != nil {
    		panic(err)
    	}
    	body := "Hello World!"
    	msg := amqp.Publishing{
    		ContentType: "text/plain",
    		Body:        []byte(body),
    	}
    	err = ch.Publish(
    		"exchange",
    		"info",
    		false,
    		false,
    		msg)
    	//构建一个生产者,将消息放入队列,参数为:
    	//1.交换机名称
    	//2.RouterKey,用于绑定队列
    	//3.是否为无法路由的消息进行返回处理
    	//4.是否对路由到无消费者队列的消息进行返回处理RabbitMQ3.0废弃
    	//5.消息体
    	if err != nil {
    		panic(err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    通配符模式:

    func Match_Publish() {
    	conn, err := amqp.Dial("amqp://username:password@host:5672//vhost")
    	//建立连接
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	ch, err := conn.Channel()
    	//建立一个channel(其实就是一个TCP连接)
    	if err != nil {
    		panic(err)
    	}
    	defer ch.Close()
    
    	// 创建交换机
    	if err := ch.ExchangeDeclare(
    		"exchange",          // name
    		amqp.ExchangeTopic,  // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil,                 // args
    	); err != nil {
    		panic(err)
    	}
    
    	q1, err := ch.QueueDeclare(
    		"queue1",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	q2, err := ch.QueueDeclare(
    		"queue2",
    		false,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	if err := ch.QueueBind(q1.Name, "#.error", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    	if err := ch.QueueBind(q2.Name, "#.*", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    	//#匹配多个单词,*只能匹配一个单词
    	//参数分别为:
    	//1.队列的名称
    	//2.用于绑定的路由键
    	//3.交换机的名称
    	//4.等待服务器确认
    	//5.配置的参数信息
    	if err != nil {
    		panic(err)
    	}
    	body := "Hello World!"
    	msg := amqp.Publishing{
    		ContentType: "text/plain",
    		Body:        []byte(body),
    	}
    	err = ch.Publish(
    		"exchange",
    		"info.error",
    		false,
    		false,
    		msg)
    	//构建一个生产者,将消息放入队列,参数为:
    	//1.交换机名称
    	//2.RouterKey,用于绑定队列
    	//3.是否为无法路由的消息进行返回处理
    	//4.是否对路由到无消费者队列的消息进行返回处理RabbitMQ3.0废弃
    	//5.消息体
    	if err != nil {
    		panic(err)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    队列的不公平分发:

    Prefetch count 1 
    1是不公平分发,0是轮询分发
    
    • 1
    • 2

    RabbitMQ的高级特性

    消息的可靠性:

    RabbitMQ提供了两种方式来控制消息投递的可靠性模式:
    1.confirm确认模式
    2.return 退回模式

    RabbitMQ整个消息投递的路径为:
    producer–>rabbitMQ broker–>exchange–>queue–>consumer

    利用两个Callback来控制消息的可靠性传递:
    1.消息从producer到exchange会返回一个confirmCallback
    2.消息从exchange–>queue投递失败则会返回一个returnCallback

    Consumer ACK:

    三种确认方式:
    1.自动确认:acknowledge=“none”,当消息一旦被Consumer接收到,则自动确认收到
    2.手动确认:acknowledge=“manual”,业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
    3.根据异常情况确认:acknowledge=“auto”

    消费端限流:
    1.确保ack机制为手动确认
    2.listener-container配置属性 perfetch=每次消费数目,直到手动确认消费完毕,才能拉取下一组数据

      channel.Qos(
          //每次队列只消费一个消息 这个消息处理不完服务器不会发送第二个消息过来
          //当前消费者一次能接受的最大消息数量
          1,
          //服务器传递的最大容量
          0,
          //如果为true 对channel可用 false则只对当前队列可用
          false,
       )
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    TTL:

    TTL:全称Time To Live(存活时间/过期时间)
    当消息到达存活时间后,还没有被消费,会被自动清除。
    RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间

    	msg := amqp.Publishing{
    		ContentType:  "text/plain",
    		DeliveryMode: 1,      //1代表消息持久化,2是不持久化
    		Expiration:   "5000", //5秒的过期时间
    		Body:         []byte(body),
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    死信队列:

    在这里插入图片描述
    在这里插入图片描述

    死信队列,英文缩写:DLX。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    消息成为死信的三种情况:
    1.队列消息长度到达限制
    2.消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列
    3.原队列存在消息过期设置,消息到达超时时间未被消费

    延迟队列:

    延迟队列:消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
    需求:
    1.下单后,30分钟未支付,取消订单,回滚库存
    2.新用户注册成功7天后,发送短信问候
    实现方式:
    1.定时器
    2.延迟队列
    在这里插入图片描述
    RabbitMQ未提供延迟队列功能,但是可以使用:TTL+死信队列组合实现延迟队列效果
    在这里插入图片描述

    延迟队列代码实现:

    	conn, err := amqp.Dial("amqp://username:password@host:5672//vhost")
    	//建立连接
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	ch, err := conn.Channel()
    	//建立一个channel(其实就是一个TCP连接)
    	if err != nil {
    		panic(err)
    	}
    	defer ch.Close()
    
    	// 创建交换机
    	if err := ch.ExchangeDeclare(
    		"DeadExchange",      // name
    		amqp.ExchangeFanout, // kind
    		false,               // durable
    		true,                // autoDelete
    		false,               // internal 是否rabbitmq内部使用
    		true,                // noWait
    		nil); err != nil {
    		panic(err)
    	}
    
    	q, err := ch.QueueDeclare(
    		"queue2",
    		false,
    		false,
    		false,
    		false,
    		amqp.Table{
    			"x-message-ttl":             5000,             // 消息过期时间,毫秒
    			"x-dead-letter-exchange":    "DeadExchange",   // 指定死信交换机
    			"x-dead-letter-routing-key": "DeadRoutingKey", // 指定死信routing-key
    		}, // args,
    	)
    
    	if err := ch.QueueBind(q.Name, "DeadRoutingKey", "exchange", true, nil); err != nil {
    		panic(err)
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    死信队列实现延迟队列存在的问题:

    因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很
    长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
    
    • 1
    • 2

    解决:通过安装插件。
    安装延时队列插件

    // producter.go
    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "learn_gin/go/rabbitmq/delayletter/constant"
        "learn_gin/go/rabbitmq/util"
        "strconv"
        "time"
    )
    
    func main() {
        // # ========== 1.创建连接 ==========
        mq := util.NewRabbitMQ()
        defer mq.Close()
        mqCh := mq.Channel
    
        // # ========== 2.设置队列(队列、交换机、绑定) ==========
        // 声明队列
        var err error
        _, err = mqCh.QueueDeclare(constant.Queue1, true, false, false, false, amqp.Table{
            // "x-message-ttl": 60000, // 消息过期时间(队列级别),毫秒
        })
        util.FailOnError(err, "创建队列失败")
    
        // 声明交换机
        //err = mqCh.ExchangeDeclare(Exchange1, amqp.ExchangeDirect, true, false, false, false, nil)
        err = mqCh.ExchangeDeclare(constant.Exchange1, "x-delayed-message", true, false, false, false, amqp.Table{
            "x-delayed-type": "direct", 
        })
        util.FailOnError(err, "创建交换机失败")
    
        // 队列绑定(将队列、routing-key、交换机三者绑定到一起)
        err = mqCh.QueueBind(constant.Queue1, constant.RoutingKey1, constant.Exchange1, false, nil)
        util.FailOnError(err, "队列、交换机、routing-key 绑定失败")
    
        // # ========== 4.发布消息 ==========
        message := "msg" + strconv.Itoa(int(time.Now().Unix()))
        fmt.Println(message)
        // 发布消息
        err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
            //Expiration: "10000", // 消息过期时间(消息级别),毫秒
            Headers: map[string]interface{}{
                "x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
            },
        })
        util.FailOnError(err, "消息发布失败")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    消息补偿:

    在这里插入图片描述

    消息幂等性保障:

    那我们回到RabbitMQ中,RabbitMQ中的幂等性又是什么意思呢?我们先来看看在RabbitMQ中,哪些情况可能导致非幂等?

    1 consumer接收到消息处理完成后,在给Broker返回ack途中网络中断,Broker未收到确认信息,根据
    RabbitMQ的重试补偿机制,则会把这条消息再重发给其他的消费者或等网络重连后再发送给该消费者,
    造成了消息的重复消费。
    2 或者在开启生产者confirm模式下,生产者已经把消息发送到Broker,但在Broker回传ack确认时网络
    中断,生产者也会重新发送刚才的消息,造成Broker收到了重复的消息,最终将两条重复的消息发送到
    消费端,造成了消息的重复消费。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    全局唯一ID和redis来解决幂等性问题

    生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId,1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃。
    在这里插入图片描述

    集群搭建

    启动第一个rabbitmq

    #拉取镜像
    docker pull rabbitmq:management
    
    #启动三个容器
    docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -p 15672:15672 -p 5672:5672 -p 15883:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:management
    
    docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -p 15673:15672 -p 5673:5672 -p 15884:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 rabbitmq:management
    
    docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -p 15674:15672 -p 5674:5672 -p 15885:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02  rabbitmq:management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    第一个容器:

    #进入第一个容器
    docker exec -it rabbitmqCluster01 bash
    
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl start_app
    exit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    第二个容器:

    #进入第二个容器
    docker exec -it rabbitmqCluster02 bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    
    #加入集群
    rabbitmqctl join_cluster --ram rabbit@rabbitmq01
    rabbitmqctl start_app
    exit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    第三个容器:

    #进入第三个容器
    docker exec -it rabbitmqCluster03 bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    
    #加入集群
    rabbitmqctl join_cluster --ram rabbit@rabbitmq01
    rabbitmqctl start_app
    exit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    面试题

    为什么消息中间件不直接使用http协议呢?

    1.因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、晌应码等
    附加的功能,但是对于个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据
    传递,存储,分发就够,要追求的是高性能。尽量简洁,快速。
    2.大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以
    后就不会进行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可
    能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息
    和数据的高可靠和稳健的运行。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    什么是rabbitmq?

    采用AMQP高级消息队列协议的一种消息队列技术,最大的特点就是消费并不需要确保提供方存在,实现了服务之间的
    高度解耦。
    
    • 1
    • 2

    为什么要使用rabbitmq?

    1、在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
    2、拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
    3、实现消费者和生产者之间的解耦。
    4、对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
    5、可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用rabbitmq的场景?

    1、服务间异步通信
    2、顺序消费
    3、定时任务
    4、请求削峰
    
    • 1
    • 2
    • 3
    • 4

    如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?

    1.发送方确认模式(将信道设置为confirm模式,消息投递到目的队列或写入磁盘后发送给生产者确认)
    2.接收方确认机制(消费者接收每一条消息后都必须进行确认,只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。)
    3.接收方消息确认机制
    如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下
    一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)如果消费者接收到消息却没有确认消息,连接也
    未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如何避免消息重复投递或重复消费?

    在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重
    传),避免重复的消息进入队列;
    在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID 等)作为去
    重的依据,避免同一条消息被重复消费。
    
    • 1
    • 2
    • 3
    • 4

    消息基于什么传输?

    由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传
    输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制
    
    • 1
    • 2

    消息怎么路由?

    消息提供方->路由->一至多个队列
    消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定
    通过队列路由键,可以把队列绑定到交换器上
    消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配
    常用的交换器主要分为一下三种
    1.fanout:如果交换器收到消息,将会广播到所有绑定的队列上
    2.direct:如果路由键完全匹配,消息就被投递到相应的队列
    3.topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如何确保消息不丢失?

    1.生产者和消费者的确认机制
    2.消息持久化,当然前提是队列、交换机和消息必须设置持久化
    RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条
    持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。
    一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如
    果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重新发布持久化
    日志文件中的消息到合适的队列。
    3.对于不可达消息,采用两种处理方式:Return消息机制和备份交换机。
    Return消息机制提供了回调函数ReturnCallback,当消息从交换机路由到Queue失败才会回调这个方法,监听到路由
    不可达的消息。
    备份交换机是一个普通的交换机,当发送的消息没有匹配的queue时,就会自动转移到备份交换机对应的queue。
    4.镜像队列:当MQ发生故障时,会导致服务不可用。引入RabbitMQ的镜像队列机制,将queue镜像到集群中其他的
    节点之上。如果集群中的一个节点失效了,能自动地切换到镜像中的另一个节点以保证服务的可用性。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    mq的缺点

    系统可用性降低
    系统引入的外部依赖越多,越容易挂掉,本来你就是A系统调用BCD三个系统的接口就好了,人 ABCD四个系统好
    好的,没啥问题,你偏加个MQ进来,万一MQ挂了咋整?MQ挂了,整套系统崩溃了。
    系统复杂度提高
    如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。
    一致性问题
    A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系
    统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    解耦、异步、削峰是什么?

    解耦:A系统发送数据到BCD三个系统,通过接口调用发送。如果E系统也要这个数据呢?那如果C系统现在不需要
    了呢?A系统负责人几乎崩溃A 系统跟其它各种乱七八糟的系统严重耦合,A系统产生一条比较关键的数据,很多系
    统都需要A系统将这个数据发送过来。如果使用MQ,A系统产生一条数据,发送到MQ里面去,哪个系统需要数据自
    己去MQ里面消费。如果新系统需要数据,直接从MQ里消费即可;如果某个系统不需要这条数据了,就取消对MQ
    消息的消费即可。这样下来,A系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家
    是否调用成功、失败超时等情况。
    就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其 实这个
    调用是不需要直接同步调用接口的,如果用MQ给它异步化解耦。
    
    异步:A系统接收一个请求,需要在自己本地写库,还需要在BCD三个系统写库,自己本地写库要3ms,BCD三个系
    统分别写库要 300ms、450ms、200ms。最终请求总延时是3+300+450+200=953ms,接近1s,用户感觉搞个什么
    东西,慢死了慢死了。用户通过浏览器发起请求。
    如果使用MQ,那么A系统连续发送3条消息到MQ队列中,假如耗时5ms,A系统从接受一个请求到返回响应给用
    户,总时长是3+5=8ms。
    
    削峰:减少高峰时期对服务器压力。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    mq的消息积压问题

    对于一个原本正常的消息系统来说消息积压,只会出现两种情况:要么生产者消息数量增加导致的积压;要么就是消
    费者消费变慢导致的消息积压。
    
    1.生产端:一般当生产端发生积压,就要查看你的业务逻辑是否有异常的耗时步骤导致的。是否需要改并行化操作
    等。
    2.Broker端:当Broker端发生积压首先查看,消息队列内存使用情况。如果每个队列的消息数量相对均匀的话,我们
    可以认为是流量激增,可以通过增加一部分Broker节点的方式解决。如果队列的消息数量差异很大的话,可以查看路
    由转发规则是否合理。
    3.消费端:当消费速度小于生产速度很快就会出现积压,导致消息延迟,以至于丢失。当消费速度小于生产速度的时
    候,仅增加消费者是没有用处的,因为多个消费者在同一个队列上实际是单线程资源竞争关系。,我们需要同时增加
    Broker上的队列数量才能解决这一问题。消费者不把消费的数据插入db,而是把数据插入redis中来提高消费端的速
    度。消费的信息超过两分钟直接给用户返回一个重新请求相关信息。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    CY3 CY5 CY7 FITC荧光素标记果糖/鼠李糖/水苏四糖
    《STL容器篇》-string模拟实现
    通过Docker Compose安装MQTT
    Effective C++条款18:让接口容易被正确使用,不容易被误用
    用Python自动获取PDF图纸的图纸大小,并依此分类整理
    异步FIFO设计的仿真与综合技术(1)
    系统架构设计高级技能 · 安全架构设计理论与实践
    漏洞复现--用友 畅捷通T+ .net反序列化RCE
    开源软件 FFmpeg 生成模型使用图片数据集
    深潜Kotlin协程(十八):冷热数据流
  • 原文地址:https://blog.csdn.net/qq_43716830/article/details/126803760