消息队列MQ

进程锁,queue队列,生产者消费模型

本文主要是介绍进程锁,queue队列,生产者消费模型,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

进程锁

from multiprocessing import Process, Lock
import os
import time

def task(i, lock):
    # 开始上锁
    lock.acquire()
    print("第%s个: 进程id号:%s开始进来了" % (i, os.getpid()))
    time.sleep(2)
    print("第%s个: 进程id号:%s开始走了" % (i, os.getpid()))
    # 释放锁
    lock.release()

if __name__ == '__main__':
    lock = Lock()  # 5个人用同一把锁
    for i in range(5):
        p = Process(target=task, args=(i, lock))
        p.start()

进程间数据隔离

1. 进程之间数据是互不影响的

n = 10

def task():
    global n
    n = 100
    print('task:', n)

if __name__ == '__main__':
    task()
    print("main:", n)

运行结果:
task: 100
main: 100
from multiprocessing import Process

n = 10

def task():
    global n
    n = 100
    print('task:', n)

if __name__ == '__main__':
    # task()
    p = Process(target=task)
    p.start()
    print("main:", n)

运行结果:
main: 10
task: 100

Queue队列的使用

进程间通信IPC(Inter-Process Communication)

multiprocess.Queue:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

Queue([maxsize]):创建共享的进程队列。maxsize是队列中允许的最大项数。底层队列使用管道和锁定实现。
另外,还需要运行支持线程以便队列中的数据传输到底层管道中。

Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ):返回q中的第一个项目。如果q为空,此方法将阻塞,直到队列中有项目
可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在
Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,
将引发Queue.Empty异常。

q.get_nowait() :同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) :将item放入队列。如果队列已满,此方法将阻塞至有空间可用
为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模
块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() :返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中
使用结果之间,队列中可能添加或删除了项目。在某些系统上此方法可能引发NotImplementedError异常。

q.empty() :如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是
不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() :如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。

Queue的实例举例

from multiprocessing import Queue
if __name__ == '__main__':
    # q = Queue()  # 拿到一个空队列的对象
    q = Queue(3)   # 3代表队列里的数量,如果队列>3个,阻塞
    q.put('handsome1')
    q.put('handsome2')
    q.put('handsome3')
    q.put('handsome4')
    print(q.get())
    print(q.get())
    print(q.get())

运行结果为空,一次只能三个,第四个无法运营下去故造成阻塞,无法取到数据
from multiprocessing import Queue
if __name__ == '__main__':
    # q = Queue()  # 拿到一个空队列的对象
    q = Queue(3)   #如果队列>3个,阻塞
    q.put('handsome1')
    q.put('handsome2')
    q.put('handsome3')
    q.put('handsome4',block=False,timeout=2)  # block=False 取不出就报错,timeout=2等两秒
    # q.put_nowait('ly is handsome4') #相当于block=False
    print(q.get())
    print(q.get())
    print(q.get())

运行结果:报错  queue.Full

解决进程间数据隔离问题

"""
1. 进程之间数据是互不影响的
"""
from multiprocessing import Process,Queue

n = 10

def task(queue):
    queue.put('ly')
    # print('ly')

if __name__ == '__main__':
    # task()
    q = Queue(3)
    p = Process(target=task, args=(q, ))
    p.start()

    print(q.get())

什么是生产者消费者模式(重要)

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题(容器相当于商店,
生产者生产东西直接给商店这个容器,消费者消费直接去商店这个容器)。生产者和消费者
彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消
费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞
队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

生产者消费者模型

解决大多数并发问题,通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度
**队列是,先存入的数据最先取出,即“先进先出”。 (参考**基本数据类型内置方法(上))
**栈是,最后存入的数据最先取出,即“后进先出”。**

版本1:消费者消费完没有正常结束

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '%s:生产了第%s个包子' % (os.getpid(), i)
        queue.put(data)

def consumer(queue):
    while True:
        data=queue.get()
        time.sleep(1)
        print(data)

if __name__ == '__main__':
    q=Queue()
    p=Process(target=producer,args=(q,))
    p.start()
    p1 = Process(target=consumer,args=(q,))
    p1.start()

版本2

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '%s:生产了第%s个包子' % (os.getpid(), i)
        queue.put(data)
    queue.put(None)

def consumer(queue):
    while True:
        data=queue.get()
        if data is None:break
        # if not data:break
        time.sleep(1)
        print(data)

if __name__ == '__main__':
    q=Queue()
    p=Process(target=producer,args=(q,))
    p.start()
    p1 = Process(target=consumer,args=(q,))
    p1.start()

版本3

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '%s:生产了第%s个包子' % (os.getpid(), i)
        queue.put(data)

def consumer(queue):
    while True:
        data=queue.get()
        if data is None:break
        # if not data:break
        time.sleep(1)
        print(data)

if __name__ == '__main__':
    q=Queue()
    p=Process(target=producer,args=(q,))
    p.start()
    p1 = Process(target=consumer,args=(q,))
    p1.start()
    # put(None)放在这里不行,原因是先执行了主进程,放进了None,消费者先拿到None,程序直接结束了
    q.put(None)
    print('end======>')

运行结果:end======>

版本4

from multiprocessing import Process,Queue
import os,time
def producer(queue):
    for i in range(10):
        data = '%s:生产了第%s个包子' % (os.getpid(), i)
        queue.put(data)

def consumer(queue):
    while True:
        data=queue.get()
        if data is None:break
        # if not data:break
        time.sleep(1)
        print(data)

if __name__ == '__main__':
    q=Queue()
    p=Process(target=producer,args=(q,))
    p.start()
    p1 = Process(target=consumer,args=(q,))
    p1.start()
    # put(None)放在这里不行,原因是先执行了主进程,放进了None,消费者先拿到None,程序直接结束了
    p.join()
    q.put(None)
    print('end======>')

运行结果:end======>
17276:生产了第0个包子
17276:生产了第1个包子
17276:生产了第2个包子
17276:生产了第3个包子
17276:生产了第4个包子
17276:生产了第5个包子
17276:生产了第6个包子
17276:生产了第7个包子
17276:生产了第8个包子
17276:生产了第9个包子

Process finished with exit code 0

版本5:生产者多,消费者少

from multiprocessing import Process,Queue
import os,time
def producer(queue,food):
    for i in range(10):
        data = '%s:生产了第%s个%s' % (os.getpid(), i, food)
        # print(data)
        queue.put(data)

def consumer(queue):
    while True:
        data=queue.get()
        if data is None:break
        # if not data:break
        time.sleep(1)
        print(data)

if __name__ == '__main__':
    q=Queue()
    p1=Process(target=producer,args=(q,'馒头'))
    p2=Process(target=producer,args=(q,'花卷'))
    p3=Process(target=producer,args=(q,'烧卖'))
    p1.start()
    p2.start()
    p3.start()
    p4 = Process(target=consumer,args=(q,))
    p5 = Process(target=consumer,args=(q,))
    p4.start()
    p5.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print('end======>')

运行结果:
end======>
14952:生产了第0个花卷
14952:生产了第1个花卷
14952:生产了第2个花卷
14952:生产了第3个花卷
14952:生产了第4个花卷
14952:生产了第5个花卷
14952:生产了第6个花卷
14952:生产了第7个花卷
14952:生产了第8个花卷
14952:生产了第9个花卷
5816:生产了第0个烧卖
5816:生产了第1个烧卖
5816:生产了第2个烧卖
5816:生产了第3个烧卖
5816:生产了第4个烧卖
5816:生产了第5个烧卖
5816:生产了第6个烧卖
5816:生产了第7个烧卖
5816:生产了第8个烧卖
5816:生产了第9个烧卖
15468:生产了第0个馒头
15468:生产了第1个馒头
15468:生产了第2个馒头
15468:生产了第3个馒头
15468:生产了第4个馒头
15468:生产了第5个馒头
15468:生产了第6个馒头
15468:生产了第7个馒头
15468:生产了第8个馒头
15468:生产了第9个馒头

Process finished with exit code 0

版本6:消费者多,生产者少

from multiprocessing import Process,Queue
import os,time

def producer(queue, food):
    for i in range(10):
        data = '%s:生产了第%s个%s' % (os.getpid(), i, food)
        # print(data)
        queue.put('第%s个%s' % (i, food))

def consumer(queue, name):
    while True:
        try:
            data = queue.get(timeout=3)
            if data is None: break
            time.sleep(0.2)

            print("消费者:%s,消费了:%s" % (name, data))
        except Exception as e:
            print(e)
            break

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q, '馒头'))
    p2 = Process(target=producer, args=(q, '花卷'))
    p3 = Process(target=producer, args=(q, '烧麦'))
    p1.start()
    p2.start()
    p3.start()

    p4 = Process(target=consumer, args=(q, 'egon'))
    p5 = Process(target=consumer, args=(q, 'ly'))
    p6 = Process(target=consumer, args=(q, 'tom'))
    p7 = Process(target=consumer, args=(q, 'qq'))
    p4.start()
    p5.start()
    p6.start()
    p7.start()

    # put(None) 放在这里不行,原因是先执行了主进程,放进了None,消费者先拿到None,程序直接结束了
    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)

    print("end===========>")

运行结果:
end===========>
消费者:ly,消费了:第0个馒头
消费者:tom,消费了:第1个馒头
消费者:egon,消费了:第2个馒头
消费者:qq,消费了:第3个馒头
消费者:ly,消费了:第4个馒头
消费者:tom,消费了:第5个馒头
消费者:egon,消费了:第6个馒头
消费者:qq,消费了:第7个馒头
消费者:ly,消费了:第8个馒头
消费者:tom,消费了:第9个馒头
消费者:egon,消费了:第0个花卷
消费者:qq,消费了:第1个花卷
消费者:ly,消费了:第2个花卷
消费者:tom,消费了:第3个花卷
消费者:egon,消费了:第4个花卷
消费者:qq,消费了:第5个花卷
消费者:ly,消费了:第6个花卷
消费者:tom,消费了:第7个花卷
消费者:egon,消费了:第8个花卷
消费者:qq,消费了:第9个花卷
消费者:ly,消费了:第0个烧麦
消费者:tom,消费了:第1个烧麦
消费者:egon,消费了:第2个烧麦
消费者:qq,消费了:第3个烧麦
消费者:ly,消费了:第4个烧麦
消费者:tom,消费了:第5个烧麦
消费者:egon,消费了:第6个烧麦
消费者:qq,消费了:第7个烧麦
消费者:ly,消费了:第8个烧麦
消费者:tom,消费了:第9个烧麦

Process finished with exit code 0
其他掌握内容:
kafka
rabbitmg
httpsgs
这篇关于进程锁,queue队列,生产者消费模型的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!