线程池里面推荐的线程个数(一般根据IO的比例定制):cpu_count*
5
代码:
from threading import current_thread from concurrent.futures import ThreadPoolExecutor import time import random def func(): print(current_thread().ident,'start') #从内部获取开启进程的个数 time.sleep(random.randint(1,4)) print(current_thread().ident, 'end') tp=ThreadPoolExecutor(4) #起了4个线程 for i in range(10): tp.submit(func) #submit向池中提交任务 #得: 6876 start 13340 start 11760 start 6080 start 6876 end 6876 start 6080 end 6080 start 11760 end 11760 start 6876 end 6876 start 13340 end 13340 start 11760 end 11760 start 11760 end 6080 end
带参数
from threading import current_thread from concurrent.futures import ThreadPoolExecutor import time import random def func(a,b): print(current_thread().ident,'start',a,b) #从内部获取开启进程的个数 time.sleep(random.randint(1,4)) print(current_thread().ident, 'end') tp=ThreadPoolExecutor(4) #起了4个线程 for i in range(10): tp.submit(func,i,i+1) #得: 8624 start 0 1 7884 start 1 2 7844 start 2 3 11832 start 3 4 7844 end 7844 start 4 5 8624 end 8624 start 5 6 7884 end 7884 start 6 7 11832 end 11832 start 7 8 7844 end 7844 start 8 9 7884 end 7884 start 9 10 11832 end 8624 end 7884 end 7844 end
运用场景:高计算的时候,没有io(没有文件操作,没有数据库操作,没有网络操作,没有input)
进程池里面推荐的进程个数:cpu_count*
1<进程个数<cpu_count*
2
代码:
import os from concurrent.futures import ProcessPoolExecutor import time import random def func(a, b): print(os.getpid(), 'start', a, b) # 从内部获取开启进程的个数 time.sleep(random.randint(1, 4)) print(os.getpid(), 'end') if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 for i in range(10): tp.submit(func, i, i + 1) #得 7836 start 0 1 8888 start 1 2 11916 start 2 3 14660 start 3 4 11916 end 11916 start 4 5 7836 end 7836 start 5 6 14660 end 14660 start 6 7 7836 end 8888 end 7836 start 7 8 8888 start 8 9 14660 end 14660 start 9 10 11916 end 7836 end 8888 end 14660 end
获取任务结果
from concurrent.futures import ProcessPoolExecutor def func(a, b): return a*b if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 future_l={} for i in range(10): #异步非阻塞 ret=tp.submit(func, i, i + 1) future_l[i]=ret # print(ret) #Future未来对象 # print(ret.result()) #缺点:慢,效率低 for key in future_l: #同步阻塞 print(key,future_l[key].result())
回调函数(异步阻塞),给ret对象绑定一个回调函数。等ret对应的任务有了结果之后,立即调用print_func这个函数,就可以 立即对函数进行处理,而不是按照顺序接受结果,处理结果
代码
from concurrent.futures import ProcessPoolExecutor import os import time import random def func(a, b): print(os.getpid(), 'start', a, b) # 从内部获取开启进程的个数 time.sleep(random.randint(1, 4)) print(os.getpid(), 'end') return a*b def print_func(ret): print(ret.result()) if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 for i in range(10): #异步非阻塞 ret=tp.submit(func, i, i + 1) ret.add_done_callback(print_func) #异步阻塞,回调函数,给ret对象绑定一个回调函数。 # 等ret对应的任务有了结果之后,立即调用print_func这个函数 #就可以立即对函数进行处理,而不是按照顺序接受结果,处理结果 #得: 15284 start 0 1 7540 start 1 2 544 start 2 3 6880 start 3 4 7540 end 7540 start 4 5 2 15284 end 544 end 544 start 5 6 6 15284 start 6 7 0 6880 end 6880 start 7 8 12 15284 end 15284 start 8 9 42 7540 end 7540 start 9 10 20 544 end 30 7540 end 90 15284 end 6880 end 72 56
from concurrent.futures import ProcessPoolExecutor def func(a): b=a+1 return a*b if __name__ == '__main__': tp = ProcessPoolExecutor(4) # 起了4个线程 tm=tp.map(func,range(20)) #使用map将一个可迭代对象的元素依次传入函数 print(list(tm)) #得: [0, 2, 6, 12, 20, 30, 42, 56, 72, 90, 110, 132, 156, 182, 210, 240, 272, 306, 342, 380]