Python内置库:multiprocessing(多进程操作)

时间:2020-10-08 18:08:25   收藏:0   阅读:21

Python的多进程因为可以充分利用CPU多核的特点,所以通常用于计算密集型的场景或者需要大量数据操作的场景,而对于多线程,在某些语言中因为可以充分利用CPU,所以可能多线程的场景使用得多一点,但是在Python中,多线程只能在CPU的单核中运行,不能充分利用CPU多核的特点,所以Python多线程通常用于IO密集型的场景或者少量数据的并发操作场景。总而言之,Python的多线程只是并发执行,而不是真正的并行执行,而且只能在CPU单核上进行,所以如果需要进行大量的数据操作或者比较耗时的并行操作,那么就可以考虑使用多进程了。
本文只是根据官方文档简单记了一下multiprocessing模块中进程的基本操作,包括创建进程、进程启动方式、进程间通信、进程间同步、进程池,如果需要其他更多操作,可以参考此模块的官方中文文档

创建进程

实例化Process类创建一个进程对象,然后调用它的start方法即可生成一个新的进程(子进程)。Process进程对象的使用其实和多线程模块threading中的Thread线程对象非常相似,可以参考着来使用。

"""
简单示例:创建一个子进程
"""
import os
from multiprocessing import Process


def func(s):
    # 输出传入的参数,当前子进程的进程ID,当前进程的父进程ID
    print(s, os.getpid(), os.getppid())


# 注意:此处的if __name__ == ‘__main__‘语句不能少
if __name__ == ‘__main__‘:
    # 打印当前进程的进程ID
    print(os.getpid())
    print(‘main process start...‘)
    # 创建进程对象
    p = Process(target=func, args=(‘hello‘, ))
    # 生成一个进程,并开始运行新的进程
    p.start()
    # 等待子进程运行完毕
    p.join()
    print(‘main process end!‘)

打印输出

13888
main process start...
hello 12484 13888
main process end!

Process类
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)group不用特别指定,使用默认就行;target表示需要调用的对象;name表示新进程的名称;argskwargs表示传给target对象的元组参数和字典参数;daemon是一个关键字参数,使用时必须指定参数名,表示是否为守护进程,如果不指定则默认继承自调用者进程。
注:需要注意的是如果重写了Process__init__方法,那么在做任何操作之前需要先调用Process.__init__()方法。
常用的方法和属性:

进程启动方式

multiprocessing模块中进程的启动方式有三种spawn、fork和forkserver,在不同的系统平台上它们的使用和默认设置也会有所不同:

进程间通信

使用多进程时,一般使用消息机制(Pipe()管道和Queue()队列)实现进程间的通信,而且应该尽可能地避免同步操作,例如锁。(如果这两种方式不能满足你的要求,可以参考下官方文档中关于multiprocessing.connection的描述,它提供了如监听器对象Listener和客户端对象Client等通信方式,感兴趣的话也可以去看下)
Pipe类
Pipe([duplex]):返回一对连接对象(conn1,conn2),它们代表了管道的两端。参数duplex默认True,表示双向的(双工通信),表示管道每一端都可以进行发送和接收数据;如果设置False,则表示单向的(单工通信),此时conn1只能接受数据,conn2只能发送数据。

"""
简单示例:使用管道Pipe进行进程间通信
"""
from multiprocessing import Process, Pipe


def func(conn):
    print(‘send a list object ot other side...‘)
    # 从管道对象的一端发送数据对象
    conn.send([‘33‘, 44, None])
    conn.close()


if __name__ == ‘__main__‘:
    # 默认创建一个双工管道对象,返回的两个对象代表管道的两端,
    # 双工表示两端的对象都可以发送和接收数据,但是需要注意,
    # 需要避免多个进程或线程从一端同时读或写数据
    parent_conn, child_conn = Pipe()
    p = Process(target=func, args=(child_conn, ))
    p.start()
    # 从管道的另一端接收数据对象
    print(parent_conn.recv())
    p.join()

Connection类
multiprocessing.connection.ConnectionConnection对象允许收发可以序列化的对象或字符串,Connection对象通常使用Pipe来创建。
常用的方法:

Queue类
Queue队列采用的是FIFO(先进先出)的通信方式。(另外还有SimpleQueueJoinableQueue,感兴趣的可以参考下官方文档)
当一个对象被放入队列中时,这个对象首先会被一个后台线程序列化,然后会将序列化的数据通过一个底层管道传递到队列中,从队列中将数据取出来时也会进行反序列化的操作。
注意一点,在一个空队列中放入对象后,它的empty()方法会在一个极小的延迟后才会返回False。
注:如果一个子进程将一些对象放入队列中,那么这个进程在所有缓冲区的对象被刷新进管道之前,是不会终止的,所以,通常在终止这类进程时,应该保证队列中的数据都已被使用了。(见示例中的注释)

"""
简单示例:使用队列Queue进行进程间通信
"""
from multiprocessing import Process, Queue


def func(q):
    print(‘put a list object to queue...‘)
    # 向Queue对象中添加一个对象
    q.put([‘33‘, 44, None])
    # q.put(‘X‘ * 1000000)


if __name__ == ‘__main__‘:
    # 创建一个队列
    q = Queue()
    p = Process(target=func, args=(q, ))
    p.start()
    # 从Queue对象中获取一个对象
    print(q.get())
    # 这里需要注意,当向队列中放入的数据较大时,比如将[‘33‘, 44, None]替换为‘X‘ * 1000000时,
    # 就会在join()处卡死,为了避免这种情况,
    # 通常的做法是先使用get()将数据取出来,再使用join()方法
    p.join()

Queue([maxsize]):返回一个使用Pipe管道和少量锁和信号量实现的共享队列实例,当一个进程将一个对象放入队列时,一个写入线程将会启动并将对象从缓冲区写入管道中。
注:multiprocessing.Queue实现了标准库queue.Queue中除了task_done()join()的所有方法。
常用的方法和属性:

进程间同步

通常来说同步原语在多进程环境中并不像在多线程环境中那么必要,但是也可以参考下。注意,也可以使用Manager()对象创建同步原语。
multiprocessing.Barrier(parties[, action[, timeout]]):类似threading.Barrier的栅栏对象。
multiprocessing.Semaphore([value]):信号量对象,类似于threading.Semaphore
multiprocessing.BoundedSemaphore([value]):类似threading.BoundedSemaphore的有界信号量对象。
multiprocessing.Condition([lock]):是threading.Condition的别名,参数lock应该是multiprocessing中的Lock或者RLock对象。
multiprocessing.Event:类似threading.Event的事件对象。
multiprocessing.Lock:原始锁,除非特别说明,否则用法与threading.Lock是一致的。

"""
简单示例:使用锁保证进程间的同步操作
"""
from multiprocessing import Process, Lock


def func(lc, num):
    # 使用锁保证以下代码同一时间只有一个进程在执行
    lc.acquire()
    print(‘process num: ‘, num)
    lc.release()


if __name__ == ‘__main__‘:
    lock = Lock()
    for i in range(5):
        Process(target=func, args=(lock, i)).start()

打印输出

process num:  0
process num:  1
process num:  3
process num:  2
process num:  4

进程间共享数据

在多进程的并发编程中应当尽量避免使用共享状态,但是如果必须要使用的话,multiprocessing模块提供了两种方式来使用:共享内存和服务进程管理器(Manager()管理器对象会开启一个服务进程,允许不同机器上的进程通过网络共享数据,本文就不写了,感兴趣的可以去官方文档了解下(有对应的中文文档))。
共享内存
可以在共享内存中创建可被子进程继承的共享ctypes对象,特点是快捷方便。
multiprocessing.Value(typecode_or_type, *args, lock=True):返回一个在共享内存上创建的ctypes对象,可以通过它的value属性来访问它的值。

"""
简单示例:使用共享内存的方式,共享值Value对象和数据Array对象
"""
from multiprocessing import Process, Value, Array


def func(n, a):
    n.value = 3.333
    for i in range(len(a)):
        a[i] = -a[i]


if __name__ == ‘__main__‘:
    num = Value(‘d‘, 0.0)  # 第一个参数d表示数据类型“double”双精度浮点类型
    arr = Array(‘i‘, range(6))  # 第一个参数i表示数据类型“integer”整型
    p = Process(target=func, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])

打印输出

3.333
[0, -1, -2, -3, -4, -5]

进程池

创建一个Pool进程池对象,并执行提交给它的任务,进程池对象允许其中的进程以不同的方式运行,但是需要注意,Pool对象的方法只能是创建它的进程才能调用。
Pool类
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]):创建一个进程池对象,支持带有超时和回调的异步结果,以及一个并行的map实现。

AsyncResult类
multiprocessing.pool.AsyncResultapply_async()map_async()这两个方法返回的结果对象对应的类。
常用的方法:

"""
这是官方文档上给出的示例,我就直接贴在这儿了
"""
from multiprocessing import Pool
import time


def f(x):
    return x * x


if __name__ == ‘__main__‘:
    with Pool(processes=4) as pool:  # start 4 worker processes
        result = pool.apply_async(f, (10,))  # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))  # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))  # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))  # prints "0"
        print(next(it))  # prints "1"
        print(it.next(timeout=1))  # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))  # raises multiprocessing.TimeoutError

编程指导

这是官方文档中对于multiprocessing模块给出的一些编程建议,我放在这里了,可以参考下。
对于所有启动方法

from multiprocessing import Process, Queue

def f(q):
    q.put(‘X‘ * 1000000)

if __name__ == ‘__main__‘:
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
"""示例1"""
from multiprocessing import Process, Lock


def f():
    ... do something using "lock" ...


if __name__ == ‘__main__‘:
    lock = Lock()
    for i in range(10):
        Process(target=f).start()
"""示例2"""
from multiprocessing import Process, Lock


def f(l):
    ... do something using "l" ...


if __name__ == ‘__main__‘:
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

spawn和forkserver启动方式
spawn和forkserver的以下一些特点,相对于另外一种fork启动方式,会有一些区别和限制。

示例1:以下代码会引发RuntimeError

from multiprocessing import Process


def foo():
    print(‘hello‘)


p = Process(target=foo)
p.start()

示例2:对于以上代码,应该使用if __name__ == ‘__main__‘来保护程序入口点。

from multiprocessing import Process, freeze_support, set_start_method


def foo():
    print(‘hello‘)


# 这个入口点可以允许子进程安全导入此模块并使用此模块中的foo函数
if __name__ == ‘__main__‘:
    freeze_support()  # 如果正常运行程序而不是需要打包“冻结”,则可以忽略此句。
    set_start_method(‘spawn‘)
    p = Process(target=foo)
    p.start()
评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!