在操作系统中,进程是相互隔离的,如果进程之间需要交流数据,可以通过“文件、数据库、socket套接字通信”等方式来交流数据,另外还有“共享内存”的方式,这种方式的优势是效率更高,同时需要注意“进程同步问题,避免多进程对共享内存的操作造成脏数据,可采用类似线程同步的锁机制来解决”。
在 Python multiprocessing 官方文档中提到了“shared ctypes objects、 sharedctypes module、manager、shared_memory module”几种共享内存的方式,这里介绍 shared ctypes objects 与 sharedctypes 模块。
查看 multiprocessing.Value
的源码:
def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value:
...
发现 multiprocessing.Value 只是一个函数,返回一个sharedctypes._Value对象。转跳到 sharedctypes 模块,typecode_or_type参数表示C 语言数据类型或者对应的类型码,类型码与类型的对应关系如下:
typecode_to_type = {
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
'f': ctypes.c_float, 'd': ctypes.c_double
}
参数args用于收集初始化Value的值,对应的实参必须与指定的数据类型一致。布尔型lock指明是否使用锁机制管理进程同步。sharedctypes._Value又是什么呢?查找 sharedctypes模块的源码,并未找到_Value
的定义,但是存在Value
的定义,从Python命名规范来看,两者存在紧密的联系,sharedctypes.Value的定义如下:
def Value(typecode_or_type, *args, lock=True, ctx=None):
'''
Return a synchronization wrapper for a Value
'''
obj = RawValue(typecode_or_type, *args)
if lock is False:
return obj
if lock in (True, None):
ctx = ctx or get_context()
lock = ctx.RLock()
if not hasattr(lock, 'acquire'):
raise AttributeError("%r has no method 'acquire'" % lock)
return synchronized(obj, lock, ctx=ctx)
参数中多了一个ctx,该参数表示接受一个上下文对象。三个 if 语句都是用于lock对象的管理,建议使用默认参数值True,以保证进程操作的安全性。输入的参数 typecode_or_type, args 会调用 RawValue 函数创建一个C语言对象,该对象与 lock、ctx又被封装成 Synchronized实例。先看RawValue的定义:
def _new_value(type_):
size = ctypes.sizeof(type_)
wrapper = heap.BufferWrapper(size)
return rebuild_ctype(type_, wrapper, None)
def RawValue(typecode_or_type, *args):
'''
Returns a ctypes object allocated from shared memory
'''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
obj = _new_value(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
obj.__init__(*args)
return obj
C语言的语法不懂,但大概的逻辑应该就是根据指定的类型与初始化值,创建一个C语言对应的数据结构。synchronized 的定义如下:
def synchronized(obj, lock=None, ctx=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
ctx = ctx or get_context()
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock, ctx)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock, ctx)
return SynchronizedArray(obj, lock, ctx)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = {name: make_property(name) for name in names}
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock, ctx)
synchronized函数本质就是对obj对象进行封装,如果对象是单一数值结构,则使用Synchronized类封装,如果是字符数组则调用SynchronizedString封装,如果是整数数组,则调用SynchronizedArray封装。对于 multiprocessing.Value 使用Synchronized类封装。
class SynchronizedBase(object):
def __init__(self, obj, lock=None, ctx=None):
self._obj = obj
if lock:
self._lock = lock
else:
ctx = ctx or get_context(force=True)
self._lock = ctx.RLock()
self.acquire = self._lock.acquire
self.release = self._lock.release
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def __reduce__(self):
assert_spawning(self)
return synchronized, (self._obj, self._lock)
def get_obj(self):
return self._obj
def get_lock(self):
return self._lock
def __repr__(self):
return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
class Synchronized(SynchronizedBase):
value = make_property('value')
Synchronized 继承 SynchronizedBase, 并在其基础上增加一个 value 属性,该属性用于查看或者修改 C语言数据结构中对应的数值, 注意Synchronized是类,而 synchronized 是函数。
# Value 实例
import multiprocessing as mp
def f(x):
with x.get_lock():
x.value += 1
return
if __name__ == '__main__':
from multiprocessing import Value
cnt = Value("i", 0, lock=True)
child_process1 = mp.Process(target=f, args=(cnt,))
child_process1.start()
child_process1.join()
print(cnt, cnt.value) # 返回 , 1
返回值1反映了父进程中能够查看到子进程中对共享对象cnt的修改。
multiprocessing.Array 与 multiprocessing.Value 的逻辑非常类似,后者只能存储一个值,而前置可以存储多个值。multiprocessing.Array 的定义如下:
def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array:
...
可以发现 multiprocessing.Array 本质也是一个函数,参数 size_or_initializer 可以接受一个整数或者Python Sequence, 指定整数会创建一个包含整数N的数组,而序列则创建一个与序列长度、内容相同的C语言数组。返回对象 sharedctypes._Array 在 sharedctypes模块中并未找到定义,但是存在 Arrary 函数定义,如下:
def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
'''
Return a synchronization wrapper for a RawArray
'''
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is False:
return obj
if lock in (True, None):
ctx = ctx or get_context()
lock = ctx.RLock()
if not hasattr(lock, 'acquire'):
raise AttributeError("%r has no method 'acquire'" % lock)
return synchronized(obj, lock, ctx=ctx)
def RawArray(typecode_or_type, size_or_initializer):
'''
Returns a ctypes array allocated from shared memory
'''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
if isinstance(size_or_initializer, int):
type_ = type_ * size_or_initializer
obj = _new_value(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
return obj
else:
type_ = type_ * len(size_or_initializer)
result = _new_value(type_)
result.__init__(*size_or_initializer)
return result
创建的Array类型不同, 会使用不同的类对其进行封装, SynchronizedArray类封装 C语言数值数组, SynchronizedString封装C语言字符串数组。
class SynchronizedArray(SynchronizedBase):
def __len__(self):
return len(self._obj)
def __getitem__(self, i):
with self: # 自动加锁
return self._obj[i]
def __setitem__(self, i, value):
with self: # 自动加锁
self._obj[i] = value
def __getslice__(self, start, stop):
with self:
return self._obj[start:stop]
def __setslice__(self, start, stop, values):
with self:
self._obj[start:stop] = values
class SynchronizedString(SynchronizedArray):
value = make_property('value')
raw = make_property('raw')
可以看到可以通过切片,或者索引的方式取到数组中的数值。
# 整数数组实例
def f(arr):
arr[0] += 1
arr[1] += 2
return
if __name__ == "__main__":
from multiprocessing import Array
array = Array("i", [0, 0], lock=True)
child_process1 = mp.Process(target=f, args=(array,))
child_process1.start()
child_process1.join()
print(array, array[:])
# 返回 > [1, 2]
返回值[1, 2]表明父进程中能够看到子进程对array的修改,
# 字符串数组实例
def f(arr):
arr[0] = b"9"
arr[1] = b"9"
return
if __name__ == "__main__":
from multiprocessing import Array
name = Array("c", b"xiao")
p = mp.Process(target=f, args=(name,))
p.start()
p.join()
print(name, name.value, name.raw)
# > b'99ao' b'99ao'
multiprocessing.Value 与 multiprocessing.Array 函数分别会创建一个默认支持进程同步的共享内存的C语言数值、数组数据结构,前者只存储一个数值,而后者可以存储多个数值。创建的C语言对象类型取决于传入参数 typecode_or_type参数值,注意无论 typecode_or_type参数值是什么,都是使用sharedctypes.Syncronized类对Value函数创建的C语言对象进行封装,而multiprocessing.Array 函数中参数 typecode_or_type如果指定整数或者浮点类型,则用sharedctypes.SyncronizedArray类对C语言对象进行封装,如果指定字节或者字符串,则使用sharedctypes.SyncronizedString类对C语言对象进行封装。