• RabbitMQ-发布/订阅模式


    RabbitMQ-默认读、写方式介绍
    RabbitMQ-直连交换机(direct)使用方法

    目录

    1、发布/订阅模式介绍

    2、交换机(exchange)

    3、fanout交换机的使用方式

    3.1 声明交换机

    3.2 发送消息到交换机

    3.2 扇形交换机发送消息代码

     3.2 声明队列,用于接收消息

    3.3 binding

    4、总结


    1、发布/订阅模式介绍

    在普通的生产者、消费者模式,rabbitmq会将消息依次传递给每一个消费者,一个worker一个,平均分配,这就是Round-robin调度方式,为了实现更加复杂的调度,我们就需要使用发布/订阅的方式。

    2、交换机(exchange)

    RabbitMQ中,消息模型的核心理念就是,生产者从来不能直接将消息发送到队列,甚至生产者都不知道消息要被发送到队列中。

    相反,生产者只能将消息发送到交换机中,交换机一侧从生产者接收消息,一侧将消息发送到队列中,交换机需要知道如何处理接收到的消息,是发送给一个队列还是多个队列?这是由交换机的类型决定的。

    交换机共分为四类:  directtopicheaders and fanout. 本章节以扇形交换机为例说明rabbitmq的使用。

    3、fanout交换机的使用方式

    扇形交换机,就像你猜测的那样,他可以将他接收到的全部消息广播到所有队列里。

    3.1 声明交换机

    首先声明一个扇形交换机,type参数设置为『fanout』

    1. err = ch.ExchangeDeclare(
    2. "logs", // name
    3. "fanout", // type
    4. true, // durable
    5. false, // auto-deleted
    6. false, // internal
    7. false, // no-wait
    8. nil, // arguments
    9. )

    3.2 发送消息到交换机

    交换机设定完成后,就可以往该交换机发送消息:

    1. body := "Hello World!"
    2. err = ch.Publish("logs", "", false, false, amqp.Publishing{
    3. ContentType: "text/plain",
    4. Body: []byte(body),
    5. })

    如果要在rabbitmq的页面上查看发送的消息,需要提前创建一个队列,并绑定到该交换机[logs]上,就可以查看发送的消息:

    扇形交换机的特性,就是他会将收到的消息广播给所有绑定到该交换机的队列,我们可以创建多个队列,并绑定到该交换机上,我们发送一次消息,就会看到,所有绑定到该交换机的队列中都会有一条消息,先创建三个队列,并分别绑定到logs交换机:

    之后运行脚本,发送两次消息:

     可以看到,三个队列当中都有两条消息。

    3.2 扇形交换机发送消息代码

    1. package main
    2. import (
    3. "fmt"
    4. amqp "github.com/rabbitmq/amqp091-go"
    5. )
    6. func main() {
    7. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    8. if err != nil {
    9. fmt.Println("Failed to connect to RabbitMQ")
    10. return
    11. }
    12. defer conn.Close()
    13. ch, err := conn.Channel()
    14. if err != nil {
    15. fmt.Println("Failed to open a channel")
    16. return
    17. }
    18. err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
    19. if err != nil {
    20. fmt.Println("Failed to declare an exchange")
    21. return
    22. }
    23. body := "Hello World!"
    24. err = ch.Publish("logs", "", false, false, amqp.Publishing{
    25. ContentType: "text/plain",
    26. Body: []byte(body),
    27. })
    28. if err != nil {
    29. fmt.Println("Failed to publish a message")
    30. return
    31. }
    32. }

     3.2 声明队列,用于接收消息

    1. q, err := ch.QueueDeclare(
    2. "", // name
    3. false, // durable
    4. false, // delete when unused
    5. true, // exclusive
    6. false, // no-wait
    7. nil, // arguments
    8. )

    声明队列时,没有指定队列名称,这时,系统会返回一个随机名称存储在q变量中。 

    3.3 binding

    队列声明完成后,需要将该队列绑定到交换机上,这样交换机才能把消息广播给该队列:

    绑定代码: 

    1. err = ch.QueueBind(
    2. q.Name, // queue name
    3. "", // routing key
    4. "logs", // exchange
    5. false,
    6. nil,
    7. )

    消费者侧全部代码如下:

    1. package main
    2. import (
    3. "fmt"
    4. amqp "github.com/rabbitmq/amqp091-go"
    5. )
    6. func main() {
    7. conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    8. if err != nil {
    9. fmt.Println("Failed to connect to RabbitMQ")
    10. return
    11. }
    12. defer conn.Close()
    13. ch, err := conn.Channel()
    14. if err != nil {
    15. fmt.Println("Failed to open a channel")
    16. return
    17. }
    18. err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
    19. if err != nil {
    20. fmt.Println("Failed to declare an exchange")
    21. return
    22. }
    23. q, err := ch.QueueDeclare(
    24. "", // name
    25. false, // durable
    26. false, // delete when unused
    27. true, // exclusive
    28. false, // no-wait
    29. nil, // arguments
    30. )
    31. err = ch.QueueBind(
    32. q.Name, // queue name
    33. "", // routing key
    34. "logs", // exchange
    35. false,
    36. nil,
    37. )
    38. msgs, err := ch.Consume(
    39. q.Name, // queue
    40. "", // consumer
    41. true, // auto-ack
    42. false, // exclusive
    43. false, // no-local
    44. false, // no-wait
    45. nil, // args
    46. )
    47. var forever chan struct{}
    48. go func() {
    49. for d := range msgs {
    50. fmt.Printf(" [x] %s\n", d.Body)
    51. }
    52. }()
    53. fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    54. <-forever
    55. }

    程序启动后,控制台上会增加一个随机命名的队列。

     运行【3.2】的生产者程序,发送消息到扇形交换机,这个时候消费者就会同步消费到消息,并进行打印:

    4、总结

    关于扇形交换机,核心的一点需要我们记住,发送到扇形交换机的消息,他会将消息广播给所有绑定到该交换机的队列上,无脑广播,所有队列会同时接受到交换机上全部的消息。

  • 相关阅读:
    【智能算法】覆盖算法
    医院检验信息系统源码 医院检验LIS系统源码 LIS源码
    【Kali】简单记录
    【C++】模板初阶
    oracle性能优化:ORACLE SQL性能优化系列 (六)[转]
    PCL入门(六):深度图提取边界
    公务员备考 (十八) 申论
    ADC测试杂谈一:配置基于matlab+quartus的测试环境
    shell 脚本部署 helm
    「网络安全」SQL注入攻击的真相
  • 原文地址:https://blog.csdn.net/liupenglove/article/details/139218852