400-616-5551

您所在位置: 首页> 学习课程> python培训 | Python多进程实践

python培训 | Python多进程实践

发布百知教育 来源:学习课程 2019-11-12

本文重点探究以下几个内容:


创建进程Process

进程间数据传递Queue/Pipe/Value/Array/Manager

进程池Pool

创建进程Process


创建进程方式如下:


创建进程方式如下:

import time
from multiprocessing import Process


def foo(a, b):
   time.sleep(1)
   print(a+b)  # 不打印 此时主进程已退出

# 创建进程,指定函数和参数
p = Process(target=foo, args=(1, 2))
# 设置为守护进程(守护进程会随主进程退出而退出)
p.daemon = True
# 启动进程
p.start()
# 子进程阻塞主进程,执行完毕后主进程继续
p.join(0.5)
print('进程执行完毕')  # 打印
  • 可以看出,Process的用法与Thread类似,守护进程设置方式为deamon属性。由于join为0.5且为守护进程,因此不等守护进程运行结束主程序已退出。
    若增加另一非守护进程,则必须等非守护进程结束,守护进程才能退出

  • 同样,可以通过继承Process实现进程


进程间数据传递Queue/Pipe/Value/Array/Manager


可以通过Lock锁机制实现共享锁,但比较常用的方式还是以上这些方式,效率更高,更安全。


Queue


使用方式


使用方式

from multiprocessing import Process, Queue

def f(_q):
   _q.put([42, None, 'hi'])

if __name__ == '__main__':
   q = Queue()
   p = Process(target=f, args=(q,))
   p.start()
   print(q.get())  # 打印内容: [42,None,'hi']
   p.join()

构造

  • multiprocessing.Queue([maxsize])
    限制队列长度


类方法

  • qsize()
    返回队列的大致大小,因为多进程或者多线程一直在消耗队列,因此该数据不一定正确

  • empty()
    判断队列是否为空,如果是,则返回True,否则False

  • full()
    判断队列是否已满,如果是,则返回True,否则False

  • put(obj[, block[, timeout]])
    将对象放入队列,可选参数block为True,timeout为None

  • get()
    从队列取出对象

Pipe

使用方式

from multiprocessing import Process, Pipe

def f(conn):
   conn.send([42, None, 'hello'])
   conn.close()

if __name__ == '__main__':
   parent_conn, child_conn = Pipe()
   p = Process(target=f, args=(child_conn,))
   p.start()
   print(parent_conn.recv())   # prints "[42, None, 'hello']"
   p.join()

构造

  • conn1, conn2 = multiprocessing.Pipe([duplex])
    若duplex为True(默认),则是双向的,False则是单向的。conn1只能接受,conn2只能发送。


Value和Array


from multiprocessing import Process, Value, Array


def f(n, a):
   # 引用修改共享变量
   n.value = 3.1415927
   for i in range(len(a)):
       a[i] = -a[i]


if __name__ == '__main__':
   num = Value('d', 0.0)
   arr = Array('i', range(10))

   p = Process(target=f, args=(num, arr))
   p.start()
   p.join()

   print(num.value)  # 3.1415927
   print(arr[:])  # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Manager

更多类型支持一般使用Manager,支持的类型包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array,使用方式如下:


from multiprocessing import Process, Manager


def f(d, l):
   d[1] = '1'
   d['2'] = 2
   d[0.25] = None
   l.reverse()


if __name__ == '__main__':
   with Manager() as manager:
       d = manager.dict()
       l = manager.list(range(10))

       p = Process(target=f, args=(d, l))
       p.start()
       p.join()

       print(d)  # {1: '1', '2': 2, 0.25: None}
       print(l)  # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

进程池

一般使用方式如下:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
   return x*x

if __name__ == '__main__':
   # start 4 worker processes
   with Pool(processes=4) as pool:

       # print "[0, 1, 4,..., 81]"
       print(pool.map(f, range(10)))

       # print same numbers in arbitrary order
       for i in pool.imap_unordered(f, range(10)):
           print(i)

       # evaluate "f(20)" asynchronously
       res = pool.apply_async(f, (20,))      # runs in *only* one process
       print(res.get(timeout=1))             # prints "400"

       # evaluate "os.getpid()" asynchronously
       res = pool.apply_async(os.getpid, ()) # runs in *only* one process
       print(res.get(timeout=1))             # prints the PID of that process

       # launching multiple evaluations asynchronously *may* use more processes
       multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
       print([res.get(timeout=1) for res in multiple_results])

       # make a single worker sleep for 10 secs
       res = pool.apply_async(time.sleep, (10,))
       try:
           print(res.get(timeout=1))
       except TimeoutError:
           print("We lacked patience and got a multiprocessing.TimeoutError")

       print("For the moment, the pool remains available for more work")

   # exiting the 'with'-block has stopped the pool
   print("Now the pool is closed and no longer available")

构造函数
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • 若processes没有传,则默认为 os.cpu_count()

类方法

  • apply(func[, args[, kwds]])
    在返回结果前,将一直阻塞进程,注意:func只在一个进程中运行

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    在需要并行的场景中,一般用此方法更好。当result准备好时就调用callback,失败则调用error_callback,注意:自定义回调函数只接受一个参数且是非阻塞的

  • map(func, iterable[, chunksize])
    为map()函数的并行版。将可迭代对象拆成小块,放到不同进程中运行。对于内存占用大的可迭代对象,一般使用imap或imap_unordered

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    非阻塞版。

  • imap(func, iterable[, chunksize])
    注意:若iterable很长时,chunksize(默认为1)设一个大的值速度更快。

  • imap_unordered(func, iterable[, chunksize])
    无序(若进程个数为1,则顺序一致)

    from multiprocessing import Pool
    import time

    def f(x):
       return x*x

    if __name__ == '__main__':
       with Pool(processes=4) as pool:         # start 4 worker processes
           result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
           print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

           print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

           it = pool.imap(f, range(10))
           print(next(it))                     # prints "0"
           print(next(it))                     # prints "1"
           print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

           result = pool.apply_async(time.sleep, (10,))
           print(result.get(timeout=1))        # raises multiprocessing.TimeoutError


    python培训:http://www.baizhiedu.com/python2019


    上一篇:python培训:python入门—— python模块与包

    下一篇:应届生去公司找个Java程序员的职位需要什么技能?

    相关推荐

    www.baizhiedu.com

    有位老师想和您聊一聊

    关闭

    立即申请