本文重点探究以下几个内容:
创建进程Process
进程间数据传递Queue/Pipe/Value/Array/Manager
进程池Pool
创建进程Process
创建进程方式如下:
创建进程方式如下:
import time
from multiprocessing import Process
def foo(a, b):
time.sleep(1)
print(a+b) # 不打印 此时主进程已退出
# 创建进程,指定函数和参数
p = Process(target=foo, args=(1, 2))
# 设置为守护进程(守护进程会随主进程退出而退出)
p.daemon = True
# 启动进程
p.start()
# 子进程阻塞主进程,执行完毕后主进程继续
p.join(0.5)
print('进程执行完毕') # 打印
可以看出,Process的用法与Thread类似,守护进程设置方式为deamon属性。由于join为0.5且为守护进程,因此不等守护进程运行结束主程序已退出。
若增加另一非守护进程,则必须等非守护进程结束,守护进程才能退出同样,可以通过继承Process实现进程
进程间数据传递Queue/Pipe/Value/Array/Manager
可以通过Lock锁机制实现共享锁,但比较常用的方式还是以上这些方式,效率更高,更安全。
Queue
使用方式
使用方式
from multiprocessing import Process, Queue
def f(_q):
_q.put([42, None, 'hi'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # 打印内容: [42,None,'hi']
p.join()
构造:
multiprocessing.Queue([maxsize])
限制队列长度
类方法:
qsize()
返回队列的大致大小,因为多进程或者多线程一直在消耗队列,因此该数据不一定正确empty()
判断队列是否为空,如果是,则返回True,否则Falsefull()
判断队列是否已满,如果是,则返回True,否则Falseput(obj[, block[, timeout]])
将对象放入队列,可选参数block为True,timeout为Noneget()
从队列取出对象
Pipe
使用方式
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
构造
conn1, conn2 = multiprocessing.Pipe([duplex])
若duplex为True(默认),则是双向的,False则是单向的。conn1只能接受,conn2只能发送。
Value和Array
from multiprocessing import Process, Value, Array
def f(n, a):
# 引用修改共享变量
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value) # 3.1415927
print(arr[:]) # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Manager
更多类型支持一般使用Manager,支持的类型包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array,使用方式如下:
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d) # {1: '1', '2': 2, 0.25: None}
print(l) # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
进程池
一般使用方式如下:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
构造函数
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
若processes没有传,则默认为 os.cpu_count()
类方法
apply(func[, args[, kwds]])
在返回结果前,将一直阻塞进程,注意:func只在一个进程中运行apply_async(func[, args[, kwds[, callback[, error_callback]]]])
在需要并行的场景中,一般用此方法更好。当result准备好时就调用callback,失败则调用error_callback,注意:自定义回调函数只接受一个参数且是非阻塞的。map(func, iterable[, chunksize])
为map()函数的并行版。将可迭代对象拆成小块,放到不同进程中运行。对于内存占用大的可迭代对象,一般使用imap或imap_unorderedmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
非阻塞版。imap(func, iterable[, chunksize])
注意:若iterable很长时,chunksize(默认为1)设一个大的值速度更快。imap_unordered(func, iterable[, chunksize])
无序(若进程个数为1,则顺序一致)
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
python培训:http://www.baizhiedu.com/python2019