• python多线程返回值问题重写Thread类的run方法


    python多线程使用

    一、案例
    
    
    def compute_ceph_func(self, i, ceph_ip)
        """
        i:  hostinfo
        ceph_ip: ceph_host ip 
        """
        
    # 第一种 有返回值
    def compute_ceph_port_check(self, region, ceph_ip):
    	import concurrent.futures
    	tmp_datas = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
            # 线程列表
            to_do = []
        for i in os_services:
            future = executor.submit(self.compute_ceph_func, i, ceph_ip)
            to_do.append(future)
        for future in concurrent.futures.as_completed(to_do):
            data = future.result()  # 获取每个线程的执行结果
            tmp_datas.extend(data)
    
    
     # 另外一种 无返回值
    tmp_datas = []
    threads = []
    for i in os_services:
        t = threading.Thread(target=self.compute_ceph_func, args=(i, ceph_ip))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    tmp_datas = [thread.join() for thread in threads]
    logging.info(tmp_datas)
    
    
    # 另外一种 
    
    from concurrent.futures import ThreadPoolExecutor
    from threading import Thread
    
    
    def func(x):
        return x
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(max_workers=500)
        li = []
        for i in range(1, 500):
            li.append(pool.submit(func, i))
    
        for l in li:
            print(l.result())
    
        # 关闭线程池
        pool.shutdown()
    
    
    import os
    from multiprocessing.pool import ThreadPool
    
    
    def func(x):
        print(f"Process Id:{os.getpid()} res:{x+1}", )
        return x + 1
    
    
    if __name__ == '__main__':
        pool = ThreadPool(processes=10)
    
        li = []
        for i in range(1, 500):
            li.append(pool.apply_async(func, args=(i,)))
    
        for p in li:
            print(p.get())
    
        pool.close()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    二、说明
    1、针对第一种是有返回值的 ,可以通过future.result() 去拿到每个线程返回值
    2、无返回值问题

    对于第二种方法无返回值问题:
    可以重新写join方法,并且在run方法中给对象设置了一个属性,_return这个属性的值就是线程的执行结果,最后在join方法中return出来。

    我们可以详细看下

    # 每个线程无返回值问题
    tmp_datas = []
    threads = []
    for i in os_services:
        t = threading.Thread(target=self.compute_ceph_func, args=(i, ceph_ip))
        threads.append(t)
        t.start() # start
    for t in threads:
        t.join()
    tmp_datas = [thread.join() for thread in threads]
    
    
    # 1、首先看start()方法
    
    
        def start(self):
            """Start the thread's activity.
    
            It must be called at most once per thread object. It arranges for the
            object's run() method to be invoked in a separate thread of control.
    
            This method will raise a RuntimeError if called more than once on the
            same thread object.
    
            """
            if not self._initialized:
                raise RuntimeError("thread.__init__() not called")
    
            if self._started.is_set():
                raise RuntimeError("threads can only be started once")
    
            with _active_limbo_lock:
                _limbo[self] = self
            try:
                _start_new_thread(self._bootstrap, ())
            except Exception:
                with _active_limbo_lock:
                    del _limbo[self]
                raise
            self._started.wait()
    # 其实不难看出 start方法并没有返回值,并且从下面的__Init__ 中可以看出并没有存储下来
    
    class Thread:
        """A class that represents a thread of control.
    
        This class can be safely subclassed in a limited fashion. There are two ways
        to specify the activity: by passing a callable object to the constructor, or
        by overriding the run() method in a subclass.
    
        """
    
        _initialized = False
    # 并且从下面的__Init__ 中可以看出并没有存储下来
        def __init__(self, group=None, target=None, name=None,
                     args=(), kwargs=None, *, daemon=None):
            """This constructor should always be called with keyword arguments. Arguments are:
    
            *group* should be None; reserved for future extension when a ThreadGroup
            class is implemented.
    
            *target* is the callable object to be invoked by the run()
            method. Defaults to None, meaning nothing is called.
    
            *name* is the thread name. By default, a unique name is constructed of
            the form "Thread-N" where N is a small decimal number.
    
            *args* is the argument tuple for the target invocation. Defaults to ().
    
            *kwargs* is a dictionary of keyword arguments for the target
            invocation. Defaults to {}.
    
            If a subclass overrides the constructor, it must make sure to invoke
            the base class constructor (Thread.__init__()) before doing anything
            else to the thread.
    
            """
            assert group is None, "group argument must be None for now"
            if kwargs is None:
                kwargs = {}
            self._target = target
            self._name = str(name or _newname())
            self._args = args
            self._kwargs = kwargs
            if daemon is not None:
                self._daemonic = daemon
            else:
                self._daemonic = current_thread().daemon
            self._ident = None
            if _HAVE_THREAD_NATIVE_ID:
                self._native_id = None
            self._tstate_lock = None
            self._started = Event()
            self._is_stopped = False
            self._initialized = True
            # Copy of sys.stderr used by self._invoke_excepthook()
            self._stderr = _sys.stderr
            self._invoke_excepthook = _make_invoke_excepthook()
            # For debugging and _after_fork()
            _dangling.add(self)
    
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    3、我们可以重写这个Thread类
    重写了__init__、run方法和join方法,主要是start()涉及的方法太多了

    在这里插入图片描述
    在这里插入图片描述

    而run()却相对简单

    在这里插入图片描述

    4、重写后的run()
    
    class ThreadReturnValueHanler(Thread):
         """
         """
         def run(self):
             if self._target is not None:
                 self._return = self._target(*self._args, **self._kwargs )
                 
         def join(self):
             super().join()
             return self._return
         
    # 当然直接使用import concurrent.futures 更为方便
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    基于长短期神经网络铜期货价格预测,基于LSTM的铜期货价格预测,LSTM的详细原理
    创新永不止步——nVisual网络可视化平台针对Excel导入的创新历程
    《精通嵌入式Linux编程》——解锁嵌入式Linux开发的无限可能
    【SCI征稿】3个月左右录用!计算机信息技术等领域均可,如机器学习、遥感技术、人工智能、物联网、人工神经网络、数据挖掘、图像处理
    基于python的django框架选题推荐生鲜超市供应平台
    我用PYQT5做的第一个实用的上位机项目(二)
    业务安全情报第22期 | 不法分子为何盗刷企业短信?
    【002_音频开发_基础篇_Linux音频架构简介】
    定位相关论文阅读:神经惯性定位(二)Neural Inertial Localization
    vue3在router跳转路由时,params失效问题
  • 原文地址:https://blog.csdn.net/qq_28513801/article/details/127889796