• python 基于http方式与基于redis方式传输摄像头图片数据的实现和对比


    0. 需求

    在不同进程或者不同语言间传递摄像头图片数据,比如从java实现的代码中获取摄像头画面数据,将其传递给python实现的算法代码中进行处理。这里,提供基于http方式和基于redis方式这两种方式进行实现,并比较两者传输速度。

    作为样例,代码均采用python实现,运行环境为ubuntu 18.04。

    1. 基于http方式传递图片数据
    1.1 发送图片数据
    • 思路:创建两个线程,一个线程利用Opencv通过rtsp地址获得摄像头画面,一个线程将摄像头图片数据转为字节流,并通过http方式发送。

    • 实现

    #coding=gb2312
    # 文件名:http_send.py
    import requests
    import base64
    import cv2
    import time
    import threading
    from queue import LifoQueue
    
    
    class rtspRead: # rtsp地址读取
        def __init__(self, rtsp, port):
            self.rtsp = rtsp # 摄像头的rtsp地址
            self.addr = "http://127.0.0.1:{}/image_post".format(port) # 本地http传输地址
            self.frameQueue = LifoQueue() # 视频帧的队列
            self.frameLock = threading.Lock() # 视频帧队列的锁
            
            self.threadFlag = True # 
            
        
        def start(self): # 开始
            t1 = threading.Thread(target=self.sendFrame, args=(), daemon=True)
            t2 = threading.Thread(target=self.readFrame, args=(), daemon=True)
            t1.start()
            t2.start()
            t1.join()
            t2.join()
            
        def sendFrame(self): # 通过http发送图片
            
            num = 0 # 计算100次图片发送到接受的平均时间,以及平均帧数
    
            while self.threadFlag:
                time.sleep(0.01)
            
                is_get_frame = False # 没有从队列中获得图片
                self.frameLock.acquire()
                if self.frameQueue.qsize():
                    frame = self.frameQueue.get()
                    is_get_frame = True # 从队列中获得图片
                self.frameLock.release()
                
                if is_get_frame:
                	# frame 是ndarray对象,这里是把原始ndarray转成jpg的字节流,转成其它格式直接替换jpg即可
                    img_str = cv2.imencode('.jpg', frame)[1].tobytes()
                    #使用b64encode对bytes-like类型对象进行编码(加密)并返回bytes对象
                    img_data = base64.b64encode(img_str)
                    data = {'img': img_data}
                    
                    resp = requests.post(self.addr, data=data) # 发送图片数据,并获得http_receive.py的返回信息
                    print("结果:", resp.text)
                
            
        def readFrame(self): # 通过rtsp读取图片
            self.cap = cv2.VideoCapture(self.rtsp)
            
            if self.cap.isOpened():
                time.sleep(0.01)
            
                print("成功获得句柄!")
                while self.threadFlag:
                    ret, frame = self.cap.read()
                    if ret:
                          self.frameLock.acquire()
                          while self.frameQueue.qsize() > 3: # 尽量确保队列中为最新的图片帧
                              self.frameQueue.get()
                          self.frameQueue.put(frame)
                          self.frameLock.release()             
                
            else:
                print("句柄获得失败!")
                self.threadFlag = False
                
            self.cap.release()
    
    
    if __name__ == '__main__':
        rtsp_read = rtspRead("rtsp://xx:xx@xx", 9322)    
        rtsp_read.start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    1.2 接收图片数据并可视化
    • 思路:通过flask框架接收http请求,并将接收到的图片数据的字节流转为np格式,并进一步转为opencv格式。另起一个线程,接收opencv格式的图片数据,并做显示。

    • 实现

    #coding=gb2312
    # 文件名:http_receive.py
    from flask import Flask, request
    import base64
    import numpy as np
    import cv2
    import threading
    from queue import LifoQueue
    import time
    
    class RtspPlay():
        def __init__(self):
            self.frame = None # 视频帧
            self.threadFlag = True # 
            
        def start(self): # 开始
            t1 = threading.Thread(target=self.play, args=(), daemon=True)
            t1.start()
            #t1.join()
                  
        def play(self):
            starttime = time.time()
            
            while self.threadFlag:
                time.sleep(0.01)
                
                print("进入展示线程")
                
                if time.time() - starttime > 260:
                    self.threadFlag = False
            
                if self.frame is not None:
                    print("展示frame")
                    cv2.imshow('http_pic', self.frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                        
            cv2.destroyAllWindows()
                        
        def setFrame(self, img):
            print("更新frame")
            self.frame = img.copy()
                
                
    rtspPlay = RtspPlay()       
    rtspPlay.start()
    
    
    app = Flask(__name__)
    @app.route('/image_post', methods=['POST'])
    def img_post():
        if request.method == 'POST':
            # 获取图片数据
            img_base64 = request.form.get('img')
            # 把图片的二进制进行转化
            img_data = base64.b64decode(img_base64) #将拿到的base64的图片转换回来
            img_array = np.fromstring(img_data,np.uint8) # 转换np序列
            img = cv2.imdecode(img_array,cv2.COLOR_BGR2RGB)  # 转换Opencv格式
            mat = cv2.resize(img, (600, 600))
            print(mat.shape)
            
            rtspPlay.setFrame(mat)
            time.sleep(0.01)
      
        return 'receive img sucess'
    
    if __name__ == "__main__":
        app.run(host="127.0.0.1", port=9322)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    1.3 测试
    # 先开一个terminal窗口,启用接收进程
    python http_receive.py
    
    # 再开一个terminal窗口,启用发送进程
    python http_send.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2. 基于redis方式传递图片数据
    2.1 发送图片数据
    • 思路:创建两个线程,一个线程利用Opencv通过rtsp地址获得摄像头画面,一个线程将摄像头图片数据转为字节流,并通过redis方式发送。这里的redis方式具体指的是,redis是一个内存数据库,通过键值对存储数据,通过订阅/发布机制传递消息,所以将图片字节流数据存入redis中,并将存入消息发布出去,实现发送效果。

    • 实现

    #coding=gb2312
    # 文件名:redis_send.py
    import redis
    import cv2
    import time
    import base64
    import threading
    from queue import LifoQueue
            
    class redisSendPic: # redis发布者
        def __init__(self, cameraip):
            print("redis init ...")
            self.r = redis.Redis(host='127.0.0.1', port=6379,db=0) # 建立连接
            self.topic = 'img0' # 订阅的主题
            self.cameraip = cameraip
            
        def send(self, img):
            #print("发送图片")
            img_str = cv2.imencode('.jpg', img)[1].tobytes()
            data = base64.b64encode(img_str)
            self.r.set(self.cameraip, data)
            self.r.publish(self.topic, self.cameraip)
            
        def getResult(self,): 
            result = self.r.get('result')        
            return result
            
        def delete(self):
            self.r.delete('result')
            self.r.delete(self.cameraip)
                
    class rtspRead: # rtsp地址读取
        def __init__(self, rtsp, cameraip):
            self.rtsp = rtsp
            self.cameraip = cameraip
            self.frameQueue = LifoQueue() # 视频帧的队列
            self.frameLock = threading.Lock() # 视频帧队列的锁
            
            self.threadFlag = True # 
            
        
        def start(self): # 开始
            t1 = threading.Thread(target=self.sendFrame, args=(), daemon=True)
            t2 = threading.Thread(target=self.readFrame, args=(), daemon=True)
            t1.start()
            t2.start()
            t1.join()
            t2.join()
    
            
            
        def sendFrame(self): # 通过redis发送图片
            self.sendpic = redisSendPic(self.cameraip)
            self.sendpic.r.set('result', 'start')
            
            num = 0 # 计算100次图片发送到接受的平均时间,以及平均帧数
            total_time = 0
        
            while self.threadFlag:
                time.sleep(0.01)
            
                is_get_frame = False # 没有从队列中获得图片
                self.frameLock.acquire()
                if self.frameQueue.qsize():
                    frame = self.frameQueue.get()
                    is_get_frame = True # 从队列中获得图片
                self.frameLock.release()
                
                result = self.sendpic.getResult() # 从接受进程获得是否中止的信号
                if result is not None and str(result, 'utf-8') == 'stop':
                    self.threadFlag = False 
                    break
                
                if is_get_frame:
                    self.sendpic.r.delete('receive_time') # 删除接受时间
                    send_time = time.time() # 发送时间
                    self.sendpic.send(frame) # 发送图片
                    
                    receive_time = self.sendpic.r.get('receive_time') # 获得接受图片时间
                    while receive_time is None: # 等待接受图片
                        #print("等待接受")
                        time.sleep(0.01)
                        receive_time = self.sendpic.r.get('receive_time')
                        
                        result = self.sendpic.getResult() # 从接受进程获得是否中止的信号
                        if result is not None and str(result, 'utf-8') == 'stop':
                            self.threadFlag = False 
                            break
                    
                    if receive_time is not None:
                        total_time += float(str(receive_time, 'utf-8')) - send_time
                    #print("图像发送到接受时间:", float(str(receive_time, 'utf-8')) - send_time)
                    
                    num += 1
                    if num > 100:
                        print("发送收发100次,平均耗时{}s,平均速度为{}帧/秒".format(total_time/100, round(100/total_time,2)))
                        num = 0
                        total_time = 0
            
            self.sendpic.delete()
                
            
        def readFrame(self): # 通过rtsp读取图片
            self.cap = cv2.VideoCapture(self.rtsp)
            
            if self.cap.isOpened():
                time.sleep(0.01)
            
                print("成功获得句柄!")
                while self.threadFlag:
                    ret, frame = self.cap.read()
                    if ret:
                          self.frameLock.acquire()
                          while self.frameQueue.qsize() > 3: # 尽量确保队列中为最新的图片帧
                              self.frameQueue.get()
                          self.frameQueue.put(frame)
                          self.frameLock.release()             
                
            else:
                print("句柄获得失败!")
                self.threadFlag = False
                
            self.cap.release()
            
            
    if __name__ == '__main__':
        rtsp_read = rtspRead("rtsp://xx:xx@xx", 'xx')    
        rtsp_read.start()
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    2.2 接收图片数据并可视化
    • 思路:通过redis数据库的消息监听机制,当接收到数据入库消息,则提取图片字节流数据,并将其处理为opencv格式的图片数据,从而做到显示。

    • 实现

    #coding=utf-8
    # 文件名:redis_receive.py
    import redis
    import time
    import scipy.misc
    import cv2
    import base64
    from PIL import Image
    import io
    import time
    import scipy.misc
    import numpy as np
    
    r = redis.Redis(host='127.0.0.1',port=6379,db=0)
    ps = r.pubsub()
    charecter = "img"
    ps.subscribe(charecter + str(0))
    
    is_first = True
    
    for item in ps.listen():
        print("get message, ", item)
        #r.set("result", str("ok"))
        
        
        if is_first:
            r.set('receive_time', str(time.time())) # 获得可处理图片时间
            is_first = False 
        
        start = time.time()
        if item['type'] == 'message' and item['data'] is not None:
            img_base64 = r.get(str(item['data'], 'utf-8'))
            img_data = base64.b64decode(img_base64) #将拿到的base64的图片转换回来
            img_array = np.fromstring(img_data,np.uint8) # 转换np序列
            img = cv2.imdecode(img_array,cv2.COLOR_BGR2RGB)  # 转换Opencv格式
            r.set('receive_time', str(time.time())) # 获得可处理图片时间
            mat = cv2.resize(img, (600, 600))
            print(mat.shape)
            cv2.imshow('redis_pic', mat)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            #scipy.misc.imsave('D:/video.png', img)
            # frame = cv2.resize(img_data, (0, 0), fx=0.5, fy=0.5)
            #r.delete(charecter + str(0))
            #r.set("result", str("ok"))
            print("cost time:", time.time() - start)
            
    cv2.destroyAllWindows()
    r.set("result", str("stop"))        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    2.3 测试
    # 先开一个terminal窗口,启用接收进程
    python redis_receive.py
    
    # 再开一个terminal窗口,启用发送进程
    python redis_send.py
    
    • 1
    • 2
    • 3
    • 4
    • 5

    结束进程直接在显示窗口上按下q键即可。

    3. 对比

    综合来看,在可视化摄像头画面的前提下,两者均可做到实时显示。其中,采用redis方式速度为14帧/秒左右,采用http方式速度为10帧/秒左右。

    若要提高速度,可取消base64的加密过程;若仅考虑传输,可取消其中的可视化部分,传输速度应该会进一步提高。

  • 相关阅读:
    【区块链实战】Solidity 智能合约如何给账户充值
    【TensorFlow1.X】系列学习笔记【入门四】
    Transformer - Attention Is All You Need - 跟李沐学AI
    ChatGLM3-6B安装
    Redis(七)【持久化文件rdb&aof】
    【Shell 系列教程】shell介绍(一)
    2011年03月16日 Go生态洞察:Go朝着更高稳定性迈进
    Leetode-891-子序列宽度之和
    基于Matlab求解高教社杯全国大学生数学建模竞赛(CUMCM2018A题)——高温作业服的优化设计(源码+数据)
    shiro550反序列化漏洞
  • 原文地址:https://blog.csdn.net/qq_30841655/article/details/132618369