| 场景 | API | 通信机制 | |
|---|---|---|---|
| 多线程 | IO 密集型 | - joblib.threading - threading.Thread - concurrent.futures.ThreadPoolExecutor | 可以共享内存, 可以共享变量,但需要加锁 |
| 多进程 | 计算密集型 | - joblib.multiprocessing - multiprocessing.Pool - multiprocessing.apply_async - concurrent.futures.ProcessPoolExecutor, from concurrent.futures import ThreadPoolExecutor | 队列,无需对queue加锁 |
Python中,线程池的使用可以通过两个库实现:concurrent.futures和threading。其中,concurrent.futures是Python3.2版本新增的标准库,提供了高层次的异步执行模型,适用于大量I/O密集型任务;而threading库则适用于CPU密集型任务。
GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once — Python Wiki
为何需要GIL? 因为Python解释器不是线程安全的。
# create a process
process = multiprocessing.Process(target=task)
# create a process
process = multiprocessing.Process(target=task, args=(arg1, arg2))
# run the new process
process.start()
# SuperFastPython.com
# example of running a function in another process
from time import sleep
from multiprocessing import Process
# a custom function that blocks for a moment
def task():
# block for a moment
sleep(1)
# display a message
print('This is from another process')
# entry point
if __name__ == '__main__':
# create a process
process = Process(target=task)
# run the process
process.start()
# wait for the process to finish
print('Waiting for the process...')
process.join()
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
if __name__ == '__main__':
print('main thread is {}'.format(threading.current_thread().name))
start_time = time.time()
### multithread queue
from queue import Queue
queue = Queue()
tw = threading.Thread(target=write_to_queue, args=(queue,))
tr = threading.Thread(target=read_from_queue, args=(queue,))
tr.setDaemon(True)
tw.start()
tr.start()
tw.join()
end_time = time.time()
print('total time is {}'.format(str(end_time - start_time)))
def increase(var, lock):
global total_increase_times
for i in range(1000000):
if lock.acquire():
var[0] += 1
lock.release()
total_increase_times += 1
def decrease(var, lock):
global total_decrease_times
for i in range(1000000):
if lock.acquire():
var[0] -= 1
lock.release()
total_decrease_times += 1
if __name__ == '__main__':
print('main thread is {}'.format(threading.current_thread().name))
start_time = time.time()
lock = threading.Lock()
var = [5]
total_increase_times = 0
total_decrease_times = 0
t1 = threading.Thread(target=increase, args=(var, lock))
t2 = threading.Thread(target=decrease, args=(var, lock))
t1.start()
t2.start()
t1.join()
t2.join()
print(var)
print('total increase times: {}'.format(str(total_increase_times)))
print('total decrease times: {}'.format(str(total_decrease_times)))
end_time = time.time()
print('total time is {}'.format(str(end_time - start_time)))
场景:需要保证结果有序,同时还需要解决执行效率问题。例如引擎测试中的大规模的数据对比,数据字典中的正确性验证等等。在这种场景下,单线程串行执行虽然能保证有序,但是效率却是难以恭维,而简单的多线程执行,有序性又不好保证。此时,多线程同步队列处理这类问题就是一个很不错的选择。
对类似的场景进行抽象,输入文件为仅有的一个生产者,同步队列作为容器,每个工作线程作为一个消费者,就形成了一个简单的生产者-消费者模型(一个生产者,多个消费者)。

等等,在上面的模型中,我们可以保证进入同步队列的时候是有序的,不能保证消费者输出是有序的。这个时候,可以引入一个buffer,用于存储结果,同时在进入同步队列的时候加入一个idx用于标识顺序。输出的时候,交由生产者进行输出,生产者记录一个已处理的idx,当要输出的idx大于生产者记录的idx的时候,仅仅把数据放入buffer;在要输出的idx和生产者记录的idx一致的时候,从buffer中逐次取出idx连续的结果,逐次输出,并更新生产者记录的idx。流程如下图所示。

python中有Queue和threading等module可以供我们使用,可以方便的帮助我们实现多线程同步队列,实现代码如下。注意,根据场景需要,会有不同的处理方法,下面代码里的处理函数只是简单的对数据进行输出。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from time import sleep
from threading import Thread, Lock
from Queue import Queue
class Consumer (Thread) :
def __init__ (self, producer) :
Thread.__init__(self)
self.p = producer
def run (self) :
while True :
idx, data = self.p.get()
#Here to add processing code
sleep (1)
#Here output
p.output (idx, data)
self.p.task_done()
class Producer (Queue) :
def __init__(self) :
Queue.__init__(self)
self.lock = Lock()
self.idx = 0
self.buf = {}
def output (self, idx, data) :
self.lock.acquire()
if idx > self.idx : #only store the result
self.buf[idx] = data
elif idx == self.idx :
self._output(data) #output one result
self.idx += 1
while self.idx in self.buf: #output serial result until break
data = self.buf[self.idx]
self._output(data)
self.idx += 1
self.lock.release()
def _output (self, data) :
print data
if __name__ == '__main__' :
p = Producer ()
for i in xrange(4) :
c = Consumer(p)
c.daemon = True
c.start()
num = 0
for line in open('data', 'r') :
p.put ( (num, line.strip()) )
num += 1
p.join()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from time import sleep
from threading import Thread, Lock
from Queue import Queue
class Consumer (Thread) :
def __init__ (self, producer) :
Thread.__init__(self)
self.p = producer
def run (self) :
while True :
idx, data = self.p.get()
#Here to add processing code
sleep (1)
#Here output
p.output (idx, data)
self.p.task_done()
class Producer (Queue) :
def __init__(self) :
Queue.__init__(self)
self.lock = Lock()
self.idx = 0
self.buf = {}
def output (self, idx, data) :
self.lock.acquire()
if idx > self.idx : #only store the result
self.buf[idx] = data
elif idx == self.idx :
self._output(data) #output one result
self.idx += 1
while self.idx in self.buf: #output serial result until break
data = self.buf[self.idx]
self._output(data)
self.idx += 1
self.lock.release()
def _output (self, data) :
print data
if __name__ == '__main__' :
p = Producer ()
for i in xrange(4) :
c = Consumer(p)
c.daemon = True
c.start()
num = 0
for line in open('data', 'r') :
p.put ( (num, line.strip()) )
num += 1
p.join()
结论就是 3.11 下面,asyncio 比 gevent 快 50%,加上 uvloop 可以快一倍。纯用 asyncio 性能可以做到 redis 的 68%,而加上 uvloop 后可以做到 redis 的 88%,当然程序比较简单,没有复杂的数据处理,主要就是测评网络 I/O 性能