• flask整合rabbitMQ插件的方式


    二、Python-flask-rabbitMQ-插件方式整合

    引言

    当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

    本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

    首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

    同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

    此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

    通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

    总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

    具体步骤

    1 安装依赖:

    使用pip安装pika库:

    pip install pika
    
    • 1

    2 编写实体类:

    from queue import Queue
    from threading import Lock
    
    import pika
    
    
    # 定义交换机类型的枚举值
    class ExchangeType:
        DEFAULT = 'default'
        DIRECT = "direct"
        FANOUT = "fanout"
        TOPIC = 'topic'
    
    
    class RabbitMQ:
        def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10,heartbeat=0):
            self.credentials = pika.PlainCredentials(username, password)
            self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials,
                                                        heartbeat=heartbeat)
            self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道
            self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步
    
            for _ in range(pool_size):
                connection = self._create_connection()
                channel = connection.channel()
                self.connection_pool.put((connection, channel))
    
        def _create_connection(self):
            return pika.BlockingConnection(self.parameters)
    
        def get_channel(self):
            with self.lock:
                connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道
            return connection, channel
    
        def release_channel(self, connection, channel):
            with self.lock:
                self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池
    
        def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DIRECT):
            connection, channel = self.get_channel()
            try:
                # channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型
                channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息
            finally:
                self.release_channel(connection, channel)
    
        def receive_messages(self, queue, callback):
            connection, channel = self.get_channel()
            try:
                channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化
                # channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留
                channel.basic_qos(prefetch_count=5)  # 每次从 RabbitMQ 获取 10 条消息
                channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数
                channel.start_consuming()  # 开始消费消息
            finally:
                self.release_channel(connection, channel)
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    3 编写消费者和生产者:

    这里解释一下:在实际开发过程中我们发现需要使用heartbeat,在生产者和消费者中heartbeat是有细微差别的

    在生产者的角色中heartbeat代表如果在heartbeat时间内没有发送(生产)消息到队列里,这个通道将会被关闭,和mq断开,所以这个处理的方法是heartbeat=0(禁止心跳),这样,不论什么时候发送发送信息到队列都可以,不会因为没有发送信息断开通道

    在消费者的角色中heartbeat代表每隔heartbeat来消费一次,没有也没关系,但是要来,如果我们我们向上面一样禁止heartbeat,那样消费者永远不会来MQ队列消费数据了,所以这里的heartbeat需要设置一个值,来MQ拿数据,即使MQ没有数据也没有关系

        # 初始化 RabbitMQ 实例 RABBITMQ_PASSWORD
        rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=app.config['RABBITMQ_PORT'],
                            username=app.config['RABBITMQ_USERNAME'], password=app.config['RABBITMQ_PASSWORD'])
    
        # 在应用上下文中注册 RabbitMQ 实例
        app.config['RABBITMQ'] = rabbitmq
    
        # 初始化 RabbitMQ 实例 RABBITMQ_PASSWORD
        rabbitmq_consume = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=app.config['RABBITMQ_PORT'],
                                    username=app.config['RABBITMQ_USERNAME'], password=app.config['RABBITMQ_PASSWORD'],
                                    pool_size=1, heartbeat=15)
    
        # 在应用上下文中注册 RabbitMQ 实例
        app.config['RABBITMQ_CONSUME'] = rabbitmq_consume
        # consume_mq(app)
        #
        thread = threading.Thread(target=consume_mq, args=(app,))
    
        # 启动线程
        thread.start()
        
    def consume_mq(app):
        def ack_message(channel, delivery_tag):
            infoLog.info(f'ack_message thread id: {threading.get_ident()}')
            if channel.is_open:
                infoLog.info("处理完成回调")
                channel.basic_ack(delivery_tag)
            else:
                errorLog.error("通道已经关闭")
                # Channel is already closed, so we can't ACK this message;
                # logs and/or do something that makes sense for your app in this case.
    
        def do_work(channel, delivery_tag, body):
            try:
                print(f"消息队列内容 {body.decode()}")
                # 处理rabbitMQ内容
                to_transcribe(body.decode())
            except Exception as e:
                traceback.print_exc()
                errorLog.error(str(e))
                errorLog.exception("An error occurred:")
            finally:
                cb = functools.partial(ack_message, channel, delivery_tag)
                channel.connection.add_callback_threadsafe(cb)
    
        # 启动消费者程序,开始接收和处理消息
        def callback(channel, method, properties, body):
            try:
                infoLog.info("处理消息消息队列内容")
                delivery_tag = method.delivery_tag
                t = threading.Thread(target=do_work, args=(channel, delivery_tag, body))
                t.start()
            except Exception as e:
                errorLog.error(str(e))
                errorLog.exception("An error occurred:")
            # channel.basic_ack(delivery_tag=method.delivery_tag)
    
        def on_message(channel, method_frame, header_frame, body):
            infoLog.info(f'on_message thread id: {threading.get_ident()}')
    
        # 启动消费者程序,开始接收和处理消息
        with app.app_context():
            rabbitmq = current_app.config['RABBITMQ_CONSUME']
            rabbitmq.receive_messages('qwen_queue', callback)
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    4 初始化消费者和生产者:

    def create_app():
        app = Flask(__name__)
        connect_mq_v1(app)
    
    • 1
    • 2
    • 3

    5 其他地方使用生产者

    
    class MessageHandler:
        """处理存放音频,将所有的任务都放在MQ里面"""
    
        def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):
    
            # 文件夹名称
            self.dir_name = dir_name
            # 文件名称
            self.file_name = file_name
            # 文件上传类型
            self.request_type = request_type
            # 文件存储位置
            self.file_url = file_url
            # 客户端回调地址
            self.back_url = back_url
            # 唯一标识
            self.uuid_str = uuid_str
    
        def send(self):
            """
            :param content_type:队列类型
            :param rpc:MQ对象
            :return:
            """
            try:
                # 发送消息队列
                # rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')
                rabbitmq = current_app.config['RABBITMQ']
                rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))
                print("发送消息到mq成功,用于存放音频信息")
            except Exception as e:
                print(f"发送消息到mq服务失败,请检查, {e}")
    
        def to_json(self):
            _dict = self.__dict__
            return _dict
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    # 将请求体和uuid放到rabbitMQ中
    MessageHandler(**dates).send()
    
    • 1
    • 2
  • 相关阅读:
    wordcloud—根据文本生成词云—Python第三方库
    Matlab论文插图绘制模板第125期—特征渲染的三维气泡图
    Redis高可用方案之主从复制
    深度学习100例——卷积神经网络(CNN)实现服装图像分类
    Pthread并行编程-以斐波那契数列计算为例
    浅谈性能测试稳定性 Constant Throughput Timer(常数吞吐量定时器)
    “一键合并剪辑,轻松添加片头——全新的视频编辑工具让你成为视频制作达人“
    Java IDE MyEclipse 使用教程:创建一个新的 REST Web 服务
    Qt 集成OSG
    Web前端入门(十四)元素显示模式
  • 原文地址:https://blog.csdn.net/huiguo_/article/details/133920287