• Python多线程(一)


    前言

    最近常常需要处理大量的crash数据,对这些数据进行分析,在此之前需要将存量的数据导入自己的数据库,开始一天一天的去导,发现太慢了,后来尝试通过python多线程并行导入多天数据,以此记录对于Python多线程的使用。

    进程与线程

    在介绍Python的多线程之前,我们需要先明确一下线程和进程的概念,其实线程和进程是操作系统的基本概念,都是实现并发的方式,其二者的区别可以用一句话概括:进程是资源分配的最小单位,而线程是调度的最小单位。 线程是进程的一部分,一个进程含有一个或多个线程。

    threading的使用

    Python提供了threading库来实现多线程,其使用多线程的方式有两种,一种是直接调用如下:

    1. import threading
    2. import time
    3. def say(index):
    4.     print("thread%s is running" % index)
    5.     time.sleep(1)
    6.     print("thread%s is over" % index)
    7. if __name__ == "__main__":
    8.     threading.Thread(target=say, args=(1,)).start()

    需要注意的是函数入参的传入是通过元组实现的,如果只有一个参数,","是不能省略的。

    除了以上方法,还可以通过继承threading.Thread来实现,代码如下。

    1. import threading
    2. import time
    3. class MyThread(threading.Thread):
    4.     def __init__(selfindex):
    5.         threading.Thread.__init__(self)  # 必须的步骤
    6.         self.index = index
    7.     def run(self):
    8.         print("thread%s is running" % self.index)
    9.         time.sleep(1)
    10.         print("thread%s is over" % self.index)
    11. if __name__ == "__main__":
    12.     myThread = MyThread(1)
    13.     myThread.start()

    在threading中提供了很多方法,主要可以分为两个部分,一部分是关于线程信息的函数,一部分是线程对象的函数。

    线程信息的函数

    函数说明
    threading.active_count()活跃线程Thread的数量
    threading.current_thread()返回当前线程的thread对象
    threading.enumerate()返回当前存活线程的列表
    threading.main_thread()返回当前主线程的Thread对象

    线程对象Thread的函数和属性

    函数说明
    Thread.name线程名,可相同
    Thread.ident线程标识符,非零整数
    Thread.Daemon是否为守护线程
    Thread.is_alive()是否存活
    Thread.start()开启线程,多次调用会报错
    Thread.join(timeout=None)等待线程结束
    Thread(group=None, target=None, name=None, args=(), kwargs={}, *, deamon=None)构造函数
    Thread.run()用来重载

    线程池

    线程可以提高程序的并行性,提高程序执行的效率,虽然python的多线程只是一种假象的多线程,但是在一些io密集的程序中还是可以提高执行效率,其中的细节会在后面详细解释。在多线程中线程的调度也会造成一定的开销,线程数量越多,调度开销越大,所以我们需要控制线程的数量,使用join可以在主线程等待子线程执行结束,从而控制线程的数量。其代码如下

    1. import threading
    2. import time
    3. def say(index):
    4.     print("thread%s is running" % index)
    5.     time.sleep(1)
    6.     print("thread%s is over" % index)
    7. if __name__ == "__main__":
    8.     for i in range(142):
    9.         thread1 = threading.Thread(target=say, args=(i,))
    10.         thread2 = threading.Thread(target=say, args=(i + 1,))
    11.         thread1.start()
    12.         thread2.start()
    13.         thread1.join()
    14.         thread2.join()

    结果如下

    1. thread1 is running
    2. thread2 is running
    3. thread1 is over
    4. thread2 is over
    5. thread3 is running
    6. thread4 is running
    7. thread3 is over
    8. thread4 is over

    如果不使用join其结果如下:

    1. thread1 is running
    2. thread2 is running
    3. thread3 is running
    4. thread4 is running
    5. thread1 is over
    6. thread2 is over
    7. thread4 is over
    8. thread3 is over

    这时候是同时启动了四个线程

    使用join来控制线程数量虽然可以达到目的,但是这样的实现确实很不优雅,而且线程的创建和销毁也是很大的开销,所以针对一些执行频率高且执行时间短的情况,可以使用线程池,线程池顾名思义就是一个包含固定数量线程的池子,线程池里面的线程是可以重复利用的,执行完任务后不会立刻销毁而且返回线程池中等待,如果有任务则立即执行下一个任务。

    python中的concurrent.futures模块提供了ThreadPoolExector类来创建线程池,其提供了以下方法:

    函数说明
    submit(fn, *args, **kwargs)将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
    map(func, *iterables, timeout=None, chunksize=1)该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。超时抛出TimeoutError错误。返回每个函数的结果,注意不是返回future。
    shutdown(wait=True)关闭线程池。关闭之后线程池不再接受新任务,但会将之前提交的任务完成。

    有一些函数的执行是有返回值的,将任务通过submit提交给线程池后,会返回一个Future对象,Future有以下几个方法:

    函数说明
    cancel()取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
    cancelled()如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
    running()如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
    done()如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
    result(timeout=None)获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。超时抛出TimeoutError,取消抛出CancelledError。
    exception(timeout=None)获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
    add_done_callback(fn)为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数,参数是future。

    之前的问题可以用线程池,代码如下

    1. import time
    2. from concurrent.futures import ThreadPoolExecutor
    3. def say(index):
    4.     print("thread%s is running" % index)
    5.     time.sleep(1)
    6.     print("thread%s is over" % index)
    7. if __name__ == "__main__":
    8.     params = tuple()
    9.     for i in range(111):
    10.         params = params + (i,)
    11.     pool = ThreadPoolExecutor(max_workers=2)
    12.     pool.map(say, params)

    线程安全与锁

    正如之前所提到的,线程之间是共享资源的,所以当多个线程同时访问或处理同一资源时会产生一定的问题,会造成资源损坏或者与预期不一致。例如以下程序最后执行结果是157296且每次结果都不一样。

    1. import threading
    2. import time
    3. lock = threading.Lock()
    4. def task():
    5.     global a
    6.     for i in range(100000):
    7.         a = a + 1
    8.         if i == 50:
    9.             time.sleep(1)
    10. if __name__ == "__main__":
    11.     global a
    12.     a = 0
    13.     thread1 = threading.Thread(target=task)
    14.     thread2 = threading.Thread(target=task)
    15.     thread1.start()
    16.     thread2.start()
    17.     thread1.join()
    18.     thread2.join()
    19.     print(a)

    这时候就需要用到锁,是使用之前将资源锁定,锁定期间不允许其他线程访问,使用完之后再释放锁。在python的threading模块中有Lock和RLock两个类,它们都有两个方法,Lock.acquire(blocking=True, timeout=-1) 获取锁。Lock.release() 释放锁。其二者的区别在于RLock是可重入锁,一个线程可以多次获取,主要是为了避免死锁。一个简单的例子,以下代码会死锁

    1. Lock.acquire()
    2. Lock.acquire()
    3. Lock.release()
    4. Lock.release()

    用RLock则不会死锁

    1. RLock.acquire()
    2. RLock.acquire()
    3. RLock.release()
    4. RLock.release()

    死锁(Deadlock)是指两个或两个以上的线程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

    以上代码加锁后就可以得到想要的结果了,其代码如下

    1. import threading
    2. import time
    3. lock = threading.Lock()
    4. def task():
    5.     global a
    6.     for i in range(100000):
    7.         lock.acquire()
    8.         a = a + 1
    9.         lock.release()
    10.         if i == 50:
    11.             time.sleep(1)
    12. if __name__ == "__main__":
    13.     global a
    14.     a = 0
    15.     thread1 = threading.Thread(target=task)
    16.     thread2 = threading.Thread(target=task)
    17.     thread1.start()
    18.     thread2.start()
    19.     thread1.join()
    20.     thread2.join()
    21.     print(a)

    假的多线程

    关于python多线程的简单使用已经讲完了,现在回到之前文中提到的,python的多线程是假的多线程,为什么这么说呢,因为Python中有一个GIL,GIL的全称是Global Interpreter Lock(全局解释器锁),并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。对于计算密集型的Python多线程并不会提高执行效率,甚至可能因为线程切换开销过大导致性能还不如单线程。但是对于IO密集型的任务,Python多线程还是可以提高效率。

    最后

  • 相关阅读:
    Docker 安装 Mongo
    angular项目指定端口,实现局域网内ip访问
    2023.9.11 关于传输层协议 UDP和TCP 详解
    【Javascript】json
    Apollo微服务配置中心详解
    会议AISTATS(Artificial Intelligence and Statistics) Latex模板参考文献引用问题
    Postman接口&压力测试 ---- Tests使用(断言)
    2022北京眼睛健康用品展,护眼产品展,中国眼博会11月举办
    学习路之api --接口文档和常见的状态码
    【语音之家】AI产业沙龙—VoxSRC-22说话人识别挑战赛快商通方案介绍
  • 原文地址:https://blog.csdn.net/QStack/article/details/127680955