• Nvidia Deepstream Python Queue 多线程操作


    Nvidia Deepstream小细节系列:Deepstream Python Queue 多线程操作

    本文介绍Queue在Deepstream/Gstreamer中的作用。

    环境描述:

    • 本案例运行环境:Jetson NX
    • IDE:VSCode
    • JetPack 4.6.1 GA
    • Deepstream 6.0.1


    在Deepstream Python的官方案例中,deepstream-test3案例中用到了非常多queue模块,整个pipeline的代码如下:

    def main(args):
        # Check input arguments
        if len(args) < 2:
            sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % args[0])
            sys.exit(1)
    
        for i in range(0,len(args)-1):
            fps_streams["stream{0}".format(i)]=GETFPS(i)
        number_sources=len(args)-1
    
        # Standard GStreamer initialization
        GObject.threads_init()
        Gst.init(None)
    
        # Create gstreamer elements */
        # Create Pipeline element that will form a connection of other elements
        print("Creating Pipeline \n ")
        pipeline = Gst.Pipeline()
        is_live = False
    
        if not pipeline:
            sys.stderr.write(" Unable to create Pipeline \n")
        print("Creating streamux \n ")
    
        # Create nvstreammux instance to form batches from one or more sources.
        streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
        if not streammux:
            sys.stderr.write(" Unable to create NvStreamMux \n")
    
        pipeline.add(streammux)
        for i in range(number_sources):
            print("Creating source_bin ",i," \n ")
            uri_name=args[i+1]
            if uri_name.find("rtsp://") == 0 :
                is_live = True
            source_bin=create_source_bin(i, uri_name)
            if not source_bin:
                sys.stderr.write("Unable to create source bin \n")
            pipeline.add(source_bin)
            padname="sink_%u" %i
            sinkpad= streammux.get_request_pad(padname) 
            if not sinkpad:
                sys.stderr.write("Unable to create sink pad bin \n")
            srcpad=source_bin.get_static_pad("src")
            if not srcpad:
                sys.stderr.write("Unable to create src pad bin \n")
            srcpad.link(sinkpad)
        queue1=Gst.ElementFactory.make("queue","queue1")
        queue2=Gst.ElementFactory.make("queue","queue2")
        queue3=Gst.ElementFactory.make("queue","queue3")
        queue4=Gst.ElementFactory.make("queue","queue4")
        queue5=Gst.ElementFactory.make("queue","queue5")
        pipeline.add(queue1)
        pipeline.add(queue2)
        pipeline.add(queue3)
        pipeline.add(queue4)
        pipeline.add(queue5)
        print("Creating Pgie \n ")
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
        if not pgie:
            sys.stderr.write(" Unable to create pgie \n")
        print("Creating tiler \n ")
        tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
        if not tiler:
            sys.stderr.write(" Unable to create tiler \n")
        print("Creating nvvidconv \n ")
        nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
        if not nvvidconv:
            sys.stderr.write(" Unable to create nvvidconv \n")
        print("Creating nvosd \n ")
        nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
        if not nvosd:
            sys.stderr.write(" Unable to create nvosd \n")
        nvosd.set_property('process-mode',OSD_PROCESS_MODE)
        nvosd.set_property('display-text',OSD_DISPLAY_TEXT)
        if(is_aarch64()):
            print("Creating transform \n ")
            transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
            if not transform:
                sys.stderr.write(" Unable to create transform \n")
    
        print("Creating EGLSink \n")
        sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
        if not sink:
            sys.stderr.write(" Unable to create egl sink \n")
    
        if is_live:
            print("Atleast one of the sources is live")
            streammux.set_property('live-source', 1)
    
        streammux.set_property('width', 1920)
        streammux.set_property('height', 1080)
        streammux.set_property('batch-size', number_sources)
        streammux.set_property('batched-push-timeout', 4000000)
        pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
        pgie_batch_size=pgie.get_property("batch-size")
        if(pgie_batch_size != number_sources):
            print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
            pgie.set_property("batch-size",number_sources)
        tiler_rows=int(math.sqrt(number_sources))
        tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
        tiler.set_property("rows",tiler_rows)
        tiler.set_property("columns",tiler_columns)
        tiler.set_property("width", TILED_OUTPUT_WIDTH)
        tiler.set_property("height", TILED_OUTPUT_HEIGHT)
        sink.set_property("qos",0)
        sink.set_property("sync", 0)
    
        print("Adding elements to Pipeline \n")
        pipeline.add(pgie)
        pipeline.add(tiler)
        pipeline.add(nvvidconv)
        pipeline.add(nvosd)
        if is_aarch64():
            pipeline.add(transform)
        pipeline.add(sink)
    
        print("Linking elements in the Pipeline \n")
        streammux.link(queue1)
        queue1.link(pgie)
        pgie.link(queue2)
        queue2.link(tiler)
        tiler.link(queue3)
        queue3.link(nvvidconv)
        nvvidconv.link(queue4)
        queue4.link(nvosd)
        if is_aarch64():
            nvosd.link(queue5)
            queue5.link(transform)
            transform.link(sink)
        else:
            nvosd.link(queue5)
            queue5.link(sink)   
    
        # create an event loop and feed gstreamer bus mesages to it
        loop = GObject.MainLoop()
        bus = pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect ("message", bus_call, loop)
        tiler_src_pad=pgie.get_static_pad("src")
        Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
    
        if not tiler_src_pad:
            sys.stderr.write(" Unable to get src pad \n")
        else:
            tiler_src_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_src_pad_buffer_probe, 0)
    
        # List the sources
        print("Now playing...")
        for i, source in enumerate(args):
            if (i != 0):
                print(i, ": ", source)
    
        print("Starting pipeline \n")
        # start play back and listed to events		
        pipeline.set_state(Gst.State.PLAYING)
        try:
            loop.run()
        except:
            pass
        # cleanup
        print("Exiting app\n")
        pipeline.set_state(Gst.State.NULL)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163

    我们通过graphviz打印出pipeline的可视化图,如下:

    在这里插入图片描述

    我们可以局部放大,如下图。我们发现,在原有pipeline的基础上,每两个模块之间,我们都加了一个queue模块。而这个queue模块的输入输出在图中显示的是Any,说明queue的作用和输入输出变化无关。

    在这里插入图片描述

    我们查看Gstreamer的官网,在章节When would you want to force a thread?中解释了使用queue的原因和作用:

    We have seen that threads are created by elements but it is also possible to insert elements in the pipeline for the sole purpose of forcing a new thread in the pipeline. (我们已经看到线程是由element创建的,但也有可能为了强制在pipeline中使用新线程而在pipeline中插入element。)

    There are several reasons to force the use of threads. However, for performance reasons, you never want to use one thread for every element out there, since that will create some overhead. Let’s now list some situations where threads can be particularly useful:(强制使用线程有几个原因。 但是,出于性能原因,您永远不想为那里的每个element都使用一个线程,因为这会产生一些开销。 现在让我们列出一些线程特别有用的情况:)

    在这里插入图片描述

    • Data buffering, for example when dealing with network streams or when recording data from a live stream such as a video or audio card. Short hickups elsewhere in the pipeline will not cause data loss. See also Stream buffering about network buffering with queue2.(数据缓冲,例如在处理网络流或从视频或音频卡等实时流记录数据时。 pipeline中其他地方的短暂中断不会导致数据丢失。 另请参阅关于使用 queue2 进行网络缓冲。)

    在这里插入图片描述

    • Synchronizing output devices, e.g. when playing a stream containing both video and audio data. By using threads for both outputs, they will run independently and their synchronization will be better.(同步输出设备,例如 播放包含视频和音频数据的流时。 通过对两个输出使用线程,它们将独立运行并且它们的同步会更好。)

    所以说,queue模块的作用是强制给pipeline中的某个模块开线程。在Deepstream Python的使用过程中,个人觉得可以设置一些queue作为数据缓冲。

  • 相关阅读:
    UE5神通--POI解决方案
    什么是抽象类
    代替虚拟化云安全工程师的解决方案
    Swin transformer v2和Swin transformer v1源码对比
    ClickHouse数据一致性
    Alice and Recoloring 1题解
    Siddhi cep
    Spark在Yarn集群的两种提交模式
    Scala技术与架构-1
    图书管理系统代码实现
  • 原文地址:https://blog.csdn.net/zyctimes/article/details/125477318