码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • python--使用pika库操作rabbitmq实现需求


    Author: wencoo
    Blog:https://wencoo.blog.csdn.net/
    Date: 22/04/2024
    Email: jianwen056@aliyun.com
    Wechat:wencoo824
    QQ:1419440391
    Details:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    文章目录

    • 目录
      • 正文 或 背景
      • pika链接mq
      • pika指定消费数量
      • pika自动消费实现
      • pika获取队列任务数量
      • pika主动获取任务消费
      • 根据需求实现的完整示例代码
      • 参考
    • 技术交流
    • 音视频领域其他技术文章的链接
      • opengl相关文章
      • ffmpeg相关文章
        • ffmpeg原理相关文章
        • ffmpeg源码分析相关文章
        • ffmpeg指令相关文章
        • ffmpeg报错相关文章
      • libass相关文章
      • c/c++相关文章
      • linux相关文章
      • 其他文章
    • 后面都是一些废话,不用看,刷分的
      • 推广一个AI学习网站
      • 中国软件行业倡议书
      • 作者有话说
      • 关于内卷

    在这里插入图片描述

    目录

    正文 或 背景

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。

    有这样业务场景,算法部的同事有一个算法需要集成,有国内和国外两条链路,使用rabbitmq对应的试两个队列,但是他的算法只能开启一个实例,不能开两个进程,否则计算资源不足会崩溃(此处我想吐槽,做算法这帮人,工资又高,结果工程能力太差啦,所谓的算法也不过把github的开源库拿来改改参数,怎么好意思叫算法,搞不懂现在的中国互联网环境了),基于以上原因,需求则是需要在只开一个实力的情况下,消费两个队列里的任务。

    由于算法模型都是python写的,所以要使用python来处理mq,python处理mq的驱动库是pika,下面来学习一下pika一些使用操作。

    pika链接mq

    import pika
    import time
    
    
    credentials = pika.PlainCredentials('admin', 'Hasx9527')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = '180.184.35.67',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    pika指定消费数量

    # 可选:指定消费者处理队列消息的数量
    channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息
    
    • 1
    • 2

    pika自动消费实现

    def callback(ch, method, properties, body):
        ch.basic_ack(delivery_tag = method.delivery_tag)
        print(body.decode())
        time.sleep(10)
    
    # 可选:指定消费者处理队列消息的数量
    channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息
    
    # 告诉rabbitmq,用callback来接收消息
    channel.basic_consume('test.python.pika.1',callback)
    # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    pika获取队列任务数量

    queue_info = channel.queue_declare(queue,passive=True)
    message_count = queue_info.method.message_count
    print(f"队列任务数量为:{message_count}")
    
    • 1
    • 2
    • 3

    pika主动获取任务消费

    method_frame, header_frame, body = channel.basic_get(queue)
    if method_frame:
        # print(method_frame, header_frame, body)  # 收到的全部数据  就要body
        print(body)          # 是b'xxx'的格式
        channel.basic_ack(method_frame.delivery_tag)  #用自动应答 这个应该不要
    else:
        print('消费单条 没有收到消息')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    根据需求实现的完整示例代码

    import pika
    import time
    
    
    credentials = pika.PlainCredentials('admin', 'Hasx9527')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = '180.184.35.67',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    
    queue1 = 'test.python.pika.1'
    queue2 = 'test.python.pika.2'
    def queueConsumer(queueName):
    	while True:
    		# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
    		queue = queueName
    		channel.queue_declare(queue , durable = True)
    		# 可选:指定消费者处理队列消息的数量
    		channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息
    		method_frame, header_frame, body = channel.basic_get(queue)
    		if method_frame:
    			print(body)          # 是b'xxx'的格式
    			time.sleep(10)
    			channel.basic_ack(method_frame.delivery_tag)  #用自动应答 这个应该不要
    		else:
    			time.sleep(1)
    			break
    
    def queueConsumer2(queueName,queueName2):
    	while True:
    		# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
    		queue = queueName2
    		channel.queue_declare(queue , durable = True)
    		queue_info = channel.queue_declare(queueName,durable = True,passive=True)
    		message_count = queue_info.method.message_count
    		if message_count != 0:
    			break
    
    		# 可选:指定消费者处理队列消息的数量
    		channel.basic_qos(prefetch_count=1)  # 实现消费者负载均衡,表示同时只能处理1条消息,待处理完成后在接收下一条消息
    		method_frame, header_frame, body = channel.basic_get(queue)
    		if method_frame:
    			print(body)          # 是b'xxx'的格式
    			time.sleep(10)
    			channel.basic_ack(method_frame.delivery_tag)  #用自动应答 这个应该不要
    		else:
    			time.sleep(1)
    			break
    
    while True:
    	print("开始运行队列1================\n")
    	queueConsumer(queue1)
    
    	print("开始运行队列2================\n")
    	queueConsumer2(queue1,queue2)
    	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    参考

    • python pika 消费mq basic_get方法
    • python 操作RabbitMq详解
    • python对RabbitMQ的简单使用
    • Python三方库:Pika(RabbitMQ基础使用)

    由于笔者的水平有限, 加之编写的同时还要参与开发工作,文中难免会出现一些错误或者不准确的地方,恳请读者批评指正。如果读者有任何宝贵意见,可以加我微信 wencoo824。QQ:1419440391。

    技术交流

    欢迎加微信,搜索"wencoo824",进行技术交流,备注”博客音视频技术交流“

    音视频领域其他技术文章的链接

    opengl相关文章

    • opengl日记7-ubuntu20.04开发环境opengl拓展glfw和glad环境搭建
    • opengl日记8-opengl创建三角形
    • opengl日记9-opengl使用纹理示例
    • opengl日记10-opengl使用多个纹理示例
    • opengl日记11-opengl的transformtions变换示例
    • opengl日记12-opengl坐标系统
    • opengl日记19-opengl文字渲染-教程示例
    • opengl日记23-opengl文字渲染-渐变色-教程示例
    • opengl日记25-opengl文字渲染-渲染中文渐变色动画-直线线性运动-教程示例
    • opengl日记26-opengl文字渲染-渲染中文渐变色动画-贝塞尔运动-教程示例
    • opengl日记27-opengl报错ERROR::SHADER::PROGRAM::LINKING_FAILED
    • opengl日记28-opengl之c语言版本的glm库cglm编译使用教程

    ffmpeg相关文章

    • ffmpeg学习日记1-ffmpeg的基本介绍(相关概念理解,资料收集)

    • ffmpeg学习日记2-新建工程打印ffmpeg版本

    • ffmpeg学习日记3-视频格式和视频编码的关系

    • ffmpeg学习日记4-使用ffmpeg获取视频文件属性值

    • ffmpeg学习日记5-使用ffmpeg进行h264解码

    • ffmpeg学习日记8-YUV的几个知识点

    • ffmpeg学习日记11-使用ffmpeg将视频格式转换为视频编码h264格式

    • ffmpeg学习日记17-获取MP4视频流的帧率

    • ffmpeg学习日记19-判断AVPacket中的一帧数据是否为关键帧

    • ffmpeg学习日记21-缓存AVPacket数据

    • ffmpeg学习日记22-内存读取avio_alloc_context函数的内存释放问题

    • ffmpeg学习日记29-使用vscode调试ffmpeg源码

    • ffmpeg学习日记101-视频-MP4提取YUV数据,每一帧保存为pgm图片

    • ffmpeg学习日记121-视频-各种图片转yuv

    • ffmpeg学习日记122-视频-获取视频的解码器,yuv格式名称,理解编码格式,封装格式,yuv格式的关系

    • ffmpeg学习日记122-视频-获取视频的解码器,yuv格式名称,理解编码格式,封装格式,yuv格式的关系

    • ffmpeg日记1011-过滤器-语法高阶,逻辑,函数使用

    ffmpeg原理相关文章

    • ffmpeg日记4001-原理介绍-视频切割原理

    ffmpeg源码分析相关文章

    • ffmpeg学习日记501-源码-parse_loglevel()函数
    • ffmpeg学习日记502-源码-ffmpeg_parse_options()函数分析
    • ffmpeg学习日记503-源码-transcode()函数分析
    • ffmpeg学习日记504-源码-readme汉化
    • ffmpeg学习日记506-源码-av_image_copy()函数分析及功能
    • ffmpeg学习日记508-源码-ffmpeg --help 汉化
    • ffmpeg学习日记509-源码-从ffmpeg 源码提取编码的流程分析
    • ffmpeg学习日记512-源码-ubuntu20.04下源码编译
    • ffmpeg学习日记513-源码-configure_filtergraph()函数分析及功能

    ffmpeg指令相关文章

    • ffmpeg学习日记601-指令-视频裁剪,添加bgm合成mp4
    • ffmpeg学习日记602-指令-转换视频的分辨率
    • ffmpeg学习日记603-指令-获取视频分辨率
    • ffmpeg学习日记604-指令-将视频格式转为H264格式
    • ffmpeg学习日记605-指令-获取视频的总帧数
    • ffmpeg学习日记606-指令-将视频转为全I帧
    • ffmpeg学习日记607-指令-将mp4视频转yuv
    • ffmpeg学习日记612-指令-转换视频格式
    • ffmpeg学习日记612-指令-转换视频格式
    • ffmpeg学习日记614-指令-获取文件时长
    • ffmpeg学习日记619-指令-透明通道视频相关指令

    ffmpeg报错相关文章

    • ffmpeg学习日记701-报错-co located POCs unavailable
    • ffmpeg学习日记702-报错-包含‘PRId64‘的报错

    libass相关文章

    • libass分析1-源码分析-起源-源码编译
    • libass分析2-源码分析-示例程序test.c的源码分析
    • libass分析3-源码分析-libass中的宏定义分析
    • libass分析5-源码分析-ASS_Track结构体分析,ass文件数据是如何存放的
    • libass分析6-源码分析-ASS_Renderer结构体分析,ass文件数据是如何存放的
    • libass分析8-源码分析-libass处理event中{}的逻辑

    c/c++相关文章

    • c/c++专栏

    linux相关文章

    • linux专栏

    其他文章

    • docker-创建rabbitmq容器指令

    后面都是一些废话,不用看,刷分的

    推广一个AI学习网站

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。

    中国软件行业倡议书

    精简软件开发,电脑性能越来越好,打出的程序安装包越来越大,磁盘,内存越吃越多,这不是好现象,手机同理,大家觉得呢,欢迎发表看法,各抒己见。

    手机app随意读取用户通讯录,就是流氓行为,即使有时候弹窗提示是否授权,选择了否,但是他其实还是悄悄读取你的通讯录,并且随便给你的通讯录好友发推广信息,这一点是非常反感的,并且也触犯了用户的权益,这不仅是流氓行为,更是违法行为,某软件就不说了。

    作者有话说

    个人简介:多年工作工程经验,擅长linux下软件开发,qt,ffmpeg音视频二次开发。

    欢迎各位叨扰作者,如果有什么项目合作,创业合伙需要研发,网站推广,猎头服务,内推等等,尽管来联系,对于能挣钱的事,作者可是很感兴趣的哦。

    关于内卷

    劝大家一句,不要内卷,内卷只能害了别人,害了自己。

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 相关阅读:
    Linux C进程间通信(IPC)
    微信小程序(四)--- 自定义组件详解(properties,数据监听器,纯数据字段,插槽,父子间通信,behaviors)
    DoIP通信中的TLS加密
    Vinco Ventures任命Ted Farnsworth为联合首席执行官
    ChatGPT从⼊⻔到精通
    霓虹【算法赛】蓝桥杯第7场强者挑战赛
    《010.SpringBoot+vue之学生选课管理系统02》
    5-1模块化编程以及5-2调试工具
    使用Java实现一个简单的贪吃蛇小游戏
    做销售,如何实现快速初筛客户?
  • 原文地址:https://blog.csdn.net/bootleader/article/details/138084886
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号