◎知识点
进程池ProcessPoolExecutor
线程池ThreadPoolExecutor
◎脚本练习
▽ 进程池ProcessPoolExecutor
"""
标准库模块concurrent.futures中提供了一个类对象ProcessPoolExecutor,也用于表示进程池。
与Pool相比,ProcessPoolExecutor的功能和性能更加强大。
类对象ProcessPoolExecutor遵守了上下文管理协议,所以可以使用with语句,这样,在离开运行时
上下文会自动调用方法shutdown(wait=True)
"""
'''
from concurrent.futures import ProcessPoolExecutor
import time, random
def do_sth(i):
print('子进程%d启动' % i)
start = time.time()
time.sleep(random.random() * 10)
end = time.time()
print('子进程%d结束,耗时%.2f秒' % (i, end - start))
if __name__ == '__main__':
print('父进程启动')
"""
# 将进程池所能容纳的最大进程数指定为3
ppe = ProcessPoolExecutor(max_workers=3)
# 将需要进程池处理的任务全部交给进程池,此后会创建并启动由进程池管理的子进程
for i in range(1, 11):
ppe.submit(do_sth, i)
# 父进程被阻塞,进程池管理的所有子进程执行完之后,父进程再从被阻塞的地方继续执行
ppe.shutdown(wait=True)
"""
with ProcessPoolExecutor(max_workers=3) as ppe:
"""
for i in range(1, 11):
ppe.submit(do_sth, i)
"""
ppe.map(do_sth, range(1, 11))
print('父进程结束')
# 程序运行后会同时创建并启动3个子进程,第4个子进程要等前面3个中的某一个执行结束后才会创建并启动
'''
"""
方法submit()的返回值是一个Future实例对象,表示子进程所调用的那个函数的执行(比如:do_sth())。
可以调用Future的方法result()得到这个函数的返回值。
"""
'''
from concurrent.futures import ProcessPoolExecutor
import time
def do_sth(i):
time.sleep(2)
return i * i
if __name__ == '__main__':
"""
with ProcessPoolExecutor(max_workers=3) as ppe:
for i in range(1, 5):
future = ppe.submit(do_sth, i)
# 同步,需要等待do_sth执行完毕
print(future.result())
"""
with ProcessPoolExecutor(max_workers=3) as ppe:
objs = []
for i in range(1, 5):
future = ppe.submit(do_sth, i)
# 异步,无需等待do_sth执行完毕
print(future)
objs.append(future)
for obj in objs:
print(obj.result())
'''
"""
标准库模块concurrent.futures中还提供了两个函数:
(1) wait(fs, timeout=None, return_when=ALL_COMPLETED)
该函数用于阻塞父进程,以等待指定的Future实例对象序列,直到满足指定的条件。
参数fs用于指定要等待的Future实例对象序列。
参数timeout用于指定等待的最长时间。如果指定为None或不指定,则一直等待。
参数return_when用于指定该函数何时返回,有三种取值:FIRST_COMPLETED、FIRST_EXCEPTION
和ALL_COMPLETED,分别表示:当第一个Future实例对象已经完成或已被取消时、当第一个Future实例对象
抛出异常时、当所有Future实例对象已经完成或已被取消时。
该函数的返回值是由两个集合组成的元组,第一个集合包含了已经完成或已被取消的所有Future实例对象,
第二个集合包含了没有完成并且没有被取消的Future实例对象。
(2) as_completed(fs, timeout=None)
该函数用于将指定的Future实例对象序列转换为一个迭代器,当序列中的任意一个Future实例对象
已经完成或已被取消时都会被yield。这样,通过遍历得到的迭代器,就可以在任意一个Future实例对象
已经完成或已被取消时立即做一些处理,比如调用方法result()得到执行结果。
参数fs用于指定Future实例对象序列。
参数timeout用于指定超时时间。如果指定为None或不指定,则不会超时。
该函数的返回值是Future实例对象的迭代器。
"""
from concurrent.futures import ProcessPoolExecutor, wait, as_completed, \
ALL_COMPLETED, FIRST_COMPLETED
import time, random
def do_sth(i):
time.sleep(random.random() * 10)
return i * i
if __name__ == '__main__':
ppe = ProcessPoolExecutor(max_workers=3)
"""
objs = []
for i in range(1, 5):
future = ppe.submit(do_sth, i)
objs.append(future)
# (done, not_done) = wait(objs, return_when=ALL_COMPLETED)
(done, not_done) = wait(objs, return_when=FIRST_COMPLETED)
print(done)
print(not_done)
"""
objs = []
for i in range(1, 5):
future = ppe.submit(do_sth, i)
objs.append(future)
future_iterator = as_completed(objs)
for future in future_iterator:
print(future.result())