• Python 多进程编程《*》: 共享内存 shared ctypes objects & sharedctypes 模块


    前言

    在操作系统中,进程是相互隔离的,如果进程之间需要交流数据,可以通过“文件、数据库、socket套接字通信”等方式来交流数据,另外还有“共享内存”的方式,这种方式的优势是效率更高,同时需要注意“进程同步问题,避免多进程对共享内存的操作造成脏数据,可采用类似线程同步的锁机制来解决”。

    在 Python multiprocessing 官方文档中提到了“shared ctypes objects、 sharedctypes module、manager、shared_memory module”几种共享内存的方式,这里介绍 shared ctypes objects 与 sharedctypes 模块。
     

    shared ctypes objects :Value

    查看 multiprocessing.Value 的源码:

    	def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: 
    		...
    
    • 1
    • 2

    发现 multiprocessing.Value 只是一个函数,返回一个sharedctypes._Value对象。转跳到 sharedctypes 模块,typecode_or_type参数表示C 语言数据类型或者对应的类型码,类型码与类型的对应关系如下:

    	typecode_to_type = {
    	    'c': ctypes.c_char,     'u': ctypes.c_wchar,
    	    'b': ctypes.c_byte,     'B': ctypes.c_ubyte,
    	    'h': ctypes.c_short,    'H': ctypes.c_ushort,
    	    'i': ctypes.c_int,      'I': ctypes.c_uint,
    	    'l': ctypes.c_long,     'L': ctypes.c_ulong,
    	    'q': ctypes.c_longlong, 'Q': ctypes.c_ulonglong,
    	    'f': ctypes.c_float,    'd': ctypes.c_double
    	    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    参数args用于收集初始化Value的值,对应的实参必须与指定的数据类型一致。布尔型lock指明是否使用锁机制管理进程同步。sharedctypes._Value又是什么呢?查找 sharedctypes模块的源码,并未找到_Value的定义,但是存在Value的定义,从Python命名规范来看,两者存在紧密的联系,sharedctypes.Value的定义如下:

    	def Value(typecode_or_type, *args, lock=True, ctx=None):
    	    '''
    	    Return a synchronization wrapper for a Value
    	    '''
    	    obj = RawValue(typecode_or_type, *args)
    	    if lock is False:
    	        return obj
    	    if lock in (True, None):
    	        ctx = ctx or get_context()
    	        lock = ctx.RLock()
    	    if not hasattr(lock, 'acquire'):
    	        raise AttributeError("%r has no method 'acquire'" % lock)
    	    return synchronized(obj, lock, ctx=ctx)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    参数中多了一个ctx,该参数表示接受一个上下文对象。三个 if 语句都是用于lock对象的管理,建议使用默认参数值True,以保证进程操作的安全性。输入的参数 typecode_or_type, args 会调用 RawValue 函数创建一个C语言对象,该对象与 lock、ctx又被封装成 Synchronized实例。先看RawValue的定义:

    	def _new_value(type_):
    	    size = ctypes.sizeof(type_)
    	    wrapper = heap.BufferWrapper(size)
    	    return rebuild_ctype(type_, wrapper, None)
    	
    	def RawValue(typecode_or_type, *args):
    	    '''
    	    Returns a ctypes object allocated from shared memory
    	    '''
    	    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
    	    obj = _new_value(type_)
    	    ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
    	    obj.__init__(*args)
    	    return obj
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    C语言的语法不懂,但大概的逻辑应该就是根据指定的类型与初始化值,创建一个C语言对应的数据结构。synchronized 的定义如下:

    def synchronized(obj, lock=None, ctx=None):
        assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
        ctx = ctx or get_context()
    
        if isinstance(obj, ctypes._SimpleCData):
            return Synchronized(obj, lock, ctx)
        elif isinstance(obj, ctypes.Array):
            if obj._type_ is ctypes.c_char:
                return SynchronizedString(obj, lock, ctx)
            return SynchronizedArray(obj, lock, ctx)
        else:
            cls = type(obj)
            try:
                scls = class_cache[cls]
            except KeyError:
                names = [field[0] for field in cls._fields_]
                d = {name: make_property(name) for name in names}
                classname = 'Synchronized' + cls.__name__
                scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
            return scls(obj, lock, ctx)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    synchronized函数本质就是对obj对象进行封装,如果对象是单一数值结构,则使用Synchronized类封装,如果是字符数组则调用SynchronizedString封装,如果是整数数组,则调用SynchronizedArray封装。对于 multiprocessing.Value 使用Synchronized类封装。

    	class SynchronizedBase(object):
    	
    	    def __init__(self, obj, lock=None, ctx=None):
    	        self._obj = obj
    	        if lock:
    	            self._lock = lock
    	        else:
    	            ctx = ctx or get_context(force=True)
    	            self._lock = ctx.RLock()
    	        self.acquire = self._lock.acquire
    	        self.release = self._lock.release
    	
    	    def __enter__(self):
    	        return self._lock.__enter__()
    	
    	    def __exit__(self, *args):
    	        return self._lock.__exit__(*args)
    	
    	    def __reduce__(self):
    	        assert_spawning(self)
    	        return synchronized, (self._obj, self._lock)
    	
    	    def get_obj(self):
    	        return self._obj
    	
    	    def get_lock(self):
    	        return self._lock
    	
    	    def __repr__(self):
    	        return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
    	
    	
    	class Synchronized(SynchronizedBase):
    	    value = make_property('value')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    Synchronized 继承 SynchronizedBase, 并在其基础上增加一个 value 属性,该属性用于查看或者修改 C语言数据结构中对应的数值, 注意Synchronized是类,而 synchronized 是函数

    	# Value 实例
    	import multiprocessing as mp
    	
    	
    	def f(x):
    	    with x.get_lock():
    	        x.value += 1
    	
    	    return
    	
    	
    	if __name__ == '__main__':
    	    from multiprocessing import Value
    	
    	    cnt = Value("i", 0, lock=True)
    	
    	    child_process1 = mp.Process(target=f, args=(cnt,))
    	    child_process1.start()
    	    child_process1.join()
    	
    	    print(cnt, cnt.value)  # 返回 ,  1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    返回值1反映了父进程中能够查看到子进程中对共享对象cnt的修改。 说明cnt是一个 Synchronized 实例,并且Synchronized 实例只是对 c_long(1) 对象的一个封装。
     

    shared ctypes objects :Array

    multiprocessing.Array 与 multiprocessing.Value 的逻辑非常类似,后者只能存储一个值,而前置可以存储多个值。multiprocessing.Array 的定义如下:

    def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: 
    	...
    
    • 1
    • 2

    可以发现 multiprocessing.Array 本质也是一个函数,参数 size_or_initializer 可以接受一个整数或者Python Sequence, 指定整数会创建一个包含整数N的数组,而序列则创建一个与序列长度、内容相同的C语言数组。返回对象 sharedctypes._Array 在 sharedctypes模块中并未找到定义,但是存在 Arrary 函数定义,如下:

    	def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
    	    '''
    	    Return a synchronization wrapper for a RawArray
    	    '''
    	    obj = RawArray(typecode_or_type, size_or_initializer)
    	    if lock is False:
    	        return obj
    	    if lock in (True, None):
    	        ctx = ctx or get_context()
    	        lock = ctx.RLock()
    	    if not hasattr(lock, 'acquire'):
    	        raise AttributeError("%r has no method 'acquire'" % lock)
    	    return synchronized(obj, lock, ctx=ctx)
    	
    	
    	def RawArray(typecode_or_type, size_or_initializer):
    	    '''
    	    Returns a ctypes array allocated from shared memory
    	    '''
    	    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
    	    if isinstance(size_or_initializer, int):
    	        type_ = type_ * size_or_initializer
    	        obj = _new_value(type_)
    	        ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
    	        return obj
    	    else:
    	        type_ = type_ * len(size_or_initializer)
    	        result = _new_value(type_)
    	        result.__init__(*size_or_initializer)
    	        return result
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    创建的Array类型不同, 会使用不同的类对其进行封装, SynchronizedArray类封装 C语言数值数组, SynchronizedString封装C语言字符串数组。

    	class SynchronizedArray(SynchronizedBase):
    	
    	    def __len__(self):
    	        return len(self._obj)
    	
    	    def __getitem__(self, i):
    	        with self:  # 自动加锁
    	            return self._obj[i]
    	
    	    def __setitem__(self, i, value):
    	        with self:  # 自动加锁
    	            self._obj[i] = value
    	
    	    def __getslice__(self, start, stop):
    	        with self:
    	            return self._obj[start:stop]
    	
    	    def __setslice__(self, start, stop, values):
    	        with self:
    	            self._obj[start:stop] = values
    
    	class SynchronizedString(SynchronizedArray):
    	    value = make_property('value')
    	    raw = make_property('raw')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以看到可以通过切片,或者索引的方式取到数组中的数值。

    	# 整数数组实例
    	def f(arr):
    	    arr[0] += 1
    	    arr[1] += 2
    	    return
    	
    	
    	if __name__ == "__main__":
    	    from multiprocessing import Array
    	
    	    array = Array("i", [0, 0], lock=True)
    	
    	    child_process1 = mp.Process(target=f, args=(array,))
    	    child_process1.start()
    	    child_process1.join()
    	
    	    print(array, array[:])
    	    # 返回 > [1, 2]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    返回值[1, 2]表明父进程中能够看到子进程对array的修改,>表明返回的array是一个 SynchronizedArray实例,该实例封装了一个 c_long_Array_2对象。

    	# 字符串数组实例
    	def f(arr):
    	    arr[0] = b"9"
    	    arr[1] = b"9"
    	    return
    	
    	
    	if __name__ == "__main__":
    	    from multiprocessing import Array
    	
    	    name = Array("c", b"xiao")
    	
    	    p = mp.Process(target=f, args=(name,))
    	    p.start()
    	    p.join()
    	
    	    print(name, name.value, name.raw)
    	    # > b'99ao' b'99ao'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

     

    小结

    multiprocessing.Value 与 multiprocessing.Array 函数分别会创建一个默认支持进程同步的共享内存的C语言数值、数组数据结构,前者只存储一个数值,而后者可以存储多个数值。创建的C语言对象类型取决于传入参数 typecode_or_type参数值,注意无论 typecode_or_type参数值是什么,都是使用sharedctypes.Syncronized类对Value函数创建的C语言对象进行封装,而multiprocessing.Array 函数中参数 typecode_or_type如果指定整数或者浮点类型,则用sharedctypes.SyncronizedArray类对C语言对象进行封装,如果指定字节或者字符串,则使用sharedctypes.SyncronizedString类对C语言对象进行封装。

  • 相关阅读:
    编码文字使用整数xyz 三个坐标 并使用
    LOAD_BALANCE=false 主在不会切换备
    excel计算时间差
    Python之函数-局部变量和global
    使用Kalibr工具线对相机+IMU离线标定
    [mit6.s081] 笔记 Lab7: Multithreading
    centos7下postgresql安装postgis
    [iOS]-autoreleasePool
    基于AlgoT1设备实践多源融合定位算法(GNSS+INS+VISION)
    【Linux网络编程】高级I/O
  • 原文地址:https://blog.csdn.net/weixin_44815943/article/details/126540750