python ctrl-c 无法终止 multiprocessing pool
github pages: perfectnewer.gitub.io
gitee pages: perfectnewer.gitee.io
最近对一个线上脚本进行优化,因为时间紧逻辑比较多,后面还有一堆事情,所以决定先改成多进程的方式。此处记录一下遇到的问题。
一开始我是质疑这些答案的,毕竟我的代码可以正常接收ctrl-c,但是最后发现本质原因是一样的,方案也是通用。那么对根因的探索祥见另外一篇python2 multiprocess pool源码简单解读
注意:
- 除了resul .get会block以外,pool.join也会block信号
- 我的代码和他的不一样,我的脚本是长期运行的。我是能收到信号的。但是整个进程也没有退出,原因是worker异常退出导致。虽然原因不同,但是这个方案是可行的。因为worker不会因为ctrl-c异常退出了
#!/bin/env python from __future__ import print_function import multiprocessing import os import signal import time def run_worker(delay): print("In a worker process", os.getpid()) time.sleep(delay) def main(): print("Initializng 2 workers") original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) pool = multiprocessing.Pool(2) signal.signal(signal.SIGINT, original_sigint_handler) try: print("Starting 2 jobs of 5 seconds each") res = pool.map_async(run_worker, [5, 5]) print("Waiting for results") res.get(60) # Without the timeout this blocking call ignores all signals. except KeyboardInterrupt: print("Caught KeyboardInterrupt, terminating workers") pool.terminate() else: print("Normal termination") pool.close() pool.join() if __name__ == "__main__": main()
def mgr(): import multiprocessing multiprocessing.log_to_stderr() logger.info("start") origin_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) pool = Pool(3) stopper = Stopper() signal.signal(signal.SIGINT, origin_handler) signal.signal(signal.SIGTERM, functools.partial(stop, stopper)) idx = 1 working_process = [] while not stopper.is_stop(): try: v = r.blpop("test", 120) ... logger.info(v) rst = pool.apply_async(worker, args=(idx, )) working_process.append(rst) except KeyboardInterrupt: logger.info("bk error ======== parent {}".format(os.getpid())) break except Exception: pass while len(working_process) >= 3: logger.info("wait exit") readys = [] for idx, rst in enumerate(working_process): rdy = rst.ready() print('1: ', idx, ' ', rdy) if rdy: readys.append(idx) if not readys: time.sleep(10) else: for idx in readys: working_process[idx] = None working_process = filter(lambda x: x is not None, working_process) idx += 1 logger.info("exit") pool.close() pool.join()
Condition.wait
根本原因其实取决于你的具体代码逻辑。目前我遇到的有两个
threading.Condition
执行condition.wait()
的时候,会阻止信号的处理。这个是python2.7的bug,并且不会改变这个行为。在python3中已经修复。详见:threading.Condition.wait() is not interruptible in Python 2.7这个问题的原因是对进程池的理解不到位。multiprocessing.Pool
只是限制了执行任务的进程的个数,并没有限制用户放入的任务数量。虽然它保证只有规定个数的进程运行。但是你可以一直往pool里添加task。
解决方案就是自己需要对处理的任务数量统计计数。code
这个一开始就能预料到了。这个主要是因为我们业务代码有进程缓存,只要定期清理就可以了。那么方案也有两个