最近使用Python的多进程multiprocessing模块时遇到了一个进程挂起问题。特在此记录一下。
先给出一个多进程应用的最小代码。
import multiprocessing as mp def produce(q): """生产者""" for i in [1, 2, 3, 4, 5, 6, 7, 8, 9]: q.put(i) print(f"Producer quit.") def consume(q): """消费者""" while True: r = q.get() if r == 9: break print("Consumer quit.") if __name__ == "__main__": q = mp.Queue() consumer = mp.Process(target=consume, args=(q,)) consumer.start() producer = mp.Process(target=produce, args=(q,)) producer.start() producer.join() consumer.join() print("Main quit.")
这段代码中,主进程构建了一个队列,然后派生两个子进程:一个生产者与一个消费者。生产者进程按顺序将一个包含从1到9的数字列表中的元素逐个填充到队列中。消费者进程逐个将队列中的元素取出,并在读取到最后一个元素的时候退出。这段代码运行的输出如下:
Producer quit. Consumer quit. Main quit.
实际情况中,为了安全起见会限制队列的最大容量,以避免内存被占尽。在队列初始化的时候可以通过 maxsize
参数指定允许的最大尺寸。
# 生产者与消费者的函数定义无变化,这里省略。 if __name__ == "__main__": q = mp.Queue(maxsize=3) # 指定队列允许存储的最大元素数量 consumer = mp.Process(target=consume, args=(q,)) consumer.start() producer = mp.Process(target=produce, args=(q,)) producer.start() producer.join() consumer.join() print("Main quit.")
此时运行全部代码,输出如下:
Producer quit. Consumer quit. Main quit.
在限制队列大小后,应用依旧可以正常退出。
但是,如果你不小心忘记了队列存在大小限制,然后无意间修改了消费者进程的代码:
# 生产者代码无变动,省略。 def consume(q): """消费者""" while True: r = q.get() if r == 3: # 当读取到3时,停止消费队列。 break print("Consumer quit.") # 主进程代码无变动,省略。
该修改会使消费者进程在读取到数字3时就终止退出。此时运行全部代码,输出如下:
Consumer quit.
消费者进程已经正常退出。但是生产者与主进程处于挂起状态。程序卡死了。
为了找到问题的原因,尝试把程序运行的过程打印出来。分别在生产者与消费者进程中打印当前读取到的元素。
def produce(q): """生产者""" for i in [1, 2, 3, 4, 5, 6, 7, 8, 9]: q.put(i) print(f"P: {i}") # 打印存入队列中的元素 print(f"Producer quit.") def consume(q): """消费者""" while True: r = q.get() print(f"C: {r}") # 打印从队列中取出的元素 if r == 3: break print("Consumer quit.")
程序的输入如下:
P: 1 P: 2 P: 3 C: 1 P: 4 C: 2 C: 3 Consumer quit. P: 5 P: 6
从输出可以看到,消费者进程在读取到3之后退出。生产者进程继续向队列中填充元素,最后一次填充的是6,然后就没有了下文。由于队列中最后一次被取出的元素是3,而生产者是按顺序从1到9填充,所以此时队列中存在三个元素: 4, 5, 6
。而回想一下,之前将队列的最大尺寸设定为3,很显然,队列已满。此时的 put
方法在耐心地等待队列空间中。由于消费者进程已经退出,所以队列永远不会有空间。进程就卡在这里了。
要解决这个问题,有不同的方案。
第一种:避免消费者进程早于生产者进程退出。只要消费者不退出,那么队列总会迎来空闲之日。
第二种:将队列填充操作设定为非阻塞。将 put
方法的 block
参数设定为 False
,这样如果填充队列失败,程序会报队列满的错误,然后继续执行。
第三种:允许队列填充阻塞,但是规定允许的最长等待时间。队列默认的 put
方法超时为 None
,意味着一直等待直到有空闲为止。为 put
方法设定超时参数,当等待足够长时间队列依旧没有空间,则报队列满的错误。程序继续运行。
这三种方法各有长短,使用时需要根据实际情况选择。理想情况下应当通过代码逻辑将消费者作为最后退出的进程。如果由于特殊原因消费者进程行为无法控制,那么就可以考虑后两种方法,然后通过 try...except
捕捉队列满的错误,进一步结合重试来避免数据丢失。