消息队列MQ

消息队列与线程

本文主要是介绍消息队列与线程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

消息队列

from multiprocessing import Queue
q = Queue(5)  # 产生队列并限制队列长度
q.put(111)  # 放入数据
q.put(222)
q.put(333)
print(q.full())  # 判断队列是否满了
q.put(444)
q.put(555)
print(q.full())
# q.put(666)  # 超出长度限制,原地阻塞等待取出
print(q.get())  # 取出数据
print(q.get())
print(q.empty())  # 判断队列是否是空的
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
print(q.get())  # 队列为空,取不到值的时候,原地阻塞,等待存入
print(q.get_nowait())  # 如果队列为空,取不到值的时候直接报错
'''
    full()
    empty()
    get_nowait()
上述方法不能在并发的场景下精准使用!!!
    由于并发场景下,多进程的存取数据是不同步的。这样就会造成上述命令的判断不准确
之所以介绍队列是因为它可以支持进程间数据通信
'''

IPC机制

'''
1.主进程与子进程数据交互
2.两个子进程数据交互
本质:不同内存空间中的进程数据交互
'''
from multiprocessing import Process, Queue
def producer(q):
    print('子进程producer从队列中添加值')
    q.put(111)
    print('子进程producer从队列中添加值')
    q.put(222)
    print('子进程producer从队列中添加值')
    q.put(333)
def consumer(q):
    print('子进程consumer从队列中取值>>>:', q.get())
    print('子进程consumer从队列中取值>>>:', q.get())
    print('子进程consumer从队列中取值>>>:', q.get())
if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q, ))
    p1 = Process(target=consumer, args=(q, ))
    p.start()
    p1.start()
    print('主进程')

生产者消费者模型

from multiprocessing import Process, Queue, JoinableQueue
import time
import random
def producer(name, food, q):
    for i in range(5):
        data = f'{name}生产了{food}{i}'
        print(data)
        time.sleep(random.random())
        q.put(data)
def consumer(name, q):
    while True:
        food = q.get()
        # if food == None:
        #     print('没了')
        #     break
        time.sleep(random.random())
        print(f'{name}吃了{food}')
        q.task_done()
if __name__ == '__main__':
    # q = Queue()  # 无结束 最后阻塞,获取值的添加
    q = JoinableQueue()
    p1 = Process(target=producer, args=('1', 'a', q))
    p2 = Process(target=producer, args=('2', 'b', q))
    p3 = Process(target=producer, args=('3', 'c', q))
    c1 = Process(target=consumer, args=('4', q))
    c2 = Process(target=consumer, args=('5', q))
    c3 = Process(target=consumer, args=('6', q))
    c1.daemon = True
    c2.daemon = True
    c3.daemon = True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    c3.start()
    p1.join()
    p2.join()
    p3.join()
    q.join()  # 等待队列中数据全部被取出(一定要让生产者全部结束才能判断正确)
    print('主进程')
    """队列中其实已经自己加了锁 所以多进程取值也不会冲突 并且取走了就没了"""

线程理论

进程是资源单位
线程是执行单位
    进程相当于车间(圈地),线程相当于流水线(干活)
    进程相当于包工头(包活划地方),线程相当于工人(干活)
    进程只是一个工地的老板,给线程提供干活的场所。线程需要材料的时候,就问进程要
一个进程至少有一个线程

线程的资源占用远小于进程
    开进程:
        1. 申请内存
        2. 拷贝代码
    开线程:
        1. 一个进程可以假设多个线程,且无需申请内存空间、拷贝代码。
        2. 同一进程内的多线程的数据是共享的

开线程的两种方式

# 方式一
    from threading import Thread
    import time
    def task(name):
        print(f'{name} start run')
        time.sleep(1)
        print(f'{name} over')
    t = Thread(target=task, args=('oliver',))
    t.start()
    t.join()
    print('主线程')

# 方式二
    class Mythread(Thread):
        def __init__(self, username):
            self.username = username
            super().__init__()
        def run(self):
            print(f'{self.username} start run')
            time.sleep(1)
            print(f'{self.username} over')

    t = Mythread('oliver')
    t.start()
    t.join()
    print('主线程')

线程实现TCP客户端并发

# 服务端
    import socket
    from threading import Thread
    server = socket.socket()
    server.bind(('127.0.0.1', 9090))
    server.listen(5)
    def talk(sock):
        while True:
            data = sock.recv(1024)
            print(data.decode('utf8'))
            sock.send(data.upper())
    while True:
        sock, addr = server.accept()
        t = Thread(target=talk, args=(sock, ))
        t.start()
# 客户端
    import socket
    client = socket.socket()
    client.connect(('127.0.0.1', 9090))
    while True:
        client.send(b'sb')
        data = client.recv(1024)
        print(data.decode('utf8'))

线程join方法

def task(name):
    print(f'{name} start run')
    time.sleep(1)
    print(f'{name} over')
t = Thread(target=task, args=('oliver',))
t.start()
t.join()
print('主线程')
'''
线程的join方法其实与进程的join方法功能一致
无join结果:
    oliver start run
    主线程
    oliver over
有join结果:
    oliver start run
    oliver over
    主线程
    
主线程之所以要等着子线程结束才会结束整个进程  
    是因为主线程结束也就标志着整个进程的结束 要确保子线程运行过程中所需的各项资源
'''

线程对象属性和方法

'''
1.验证一个进程下的多个线程是否真的处于一个进程
    验证确实如此
2.统计进程下活跃的线程数
    active_count()  # 注意主线程也算!!!
3.获取线程的名字
    1.current_thread().name
    MainThread  				 主线程
    Thread-1、Thread-2		子线程
    2.self.name
'''
from threading import Thread, current_thread, active_count
import time
def task(name):
    print(f'{name} start run')
    time.sleep(1)
    print(os.getpid())  # 43508  两个线程的进程号一致
    print(current_thread().name)
    print(f'{name} over')
t1 = Thread(target=task, args=('oliver',))
t2 = Thread(target=task, args=('kevin',))
t1.start()
t2.start()
print(active_count())
t1.join()
t2.join()
print('主线程')
print(os.getpid())  # 43508 主线程的进程号与子线程号一致
print(current_thread().name)

守护线程

from threading import Thread
import time
def task(name):
    print(f'{name} start run')
    time.sleep(1)
    print(f'{name} over')
t1 = Thread(target=task, args=('oliver',))
t2 = Thread(target=task, args=('kevin',))
t1.daemon = True
t2.daemon = True
t1.start()
t2.start()
print('主线程')
'''
设置守护线程时,需要把所有的子线程设置成子线程,否则是无意义的
    因为只有一个线程的时候,把该子线程设置成守护线程后,该子线程会随着主线程的结束而结束
    但是多线程情况下,主进程需要等待所有非守护线程执行结束以后才能结束。这种情况下设置守护线程无意义
'''

GIL全局解释器锁

"""纯理论 不影响编程 只不过面试的时候可能会被问到"""
# 官方文档
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
"""
1.回顾
	python解释器的类别有很多
		Cpython Jpython Ppython
	垃圾回收机制
		应用计数、标记清除、分代回收
    
GIL只存在于CPython解释器中,不是python的特征
GIL是一把互斥锁用于阻止同一个进程下的多个线程同时执行
原因是因为CPython解释器中的垃圾回收机制不是线程安全的

反向验证GIL的存在 如果不存在会产生垃圾回收机制与正常线程之间数据错乱
GIL是加在CPython解释器上面的互斥锁
同一个进程下的多个线程要想执行必须先抢GIL锁 所以同一个进程下多个线程肯定不能同时运行 即无法利用多核优势


强调:同一个进程下的多个线程不能同时执行即不能利用多核优势
	很多不懂python的程序员会喷python是垃圾 速度太慢 有多核都不能用

反怼:虽然用一个进程下的多个线程不能利用多核优势 但是还可以开设多进程!!!

再次强调:python的多线程就是垃圾!!!

反怼:要结合实际情况 
	如果多个任务都是IO密集型的 那么多线程更有优势(消耗的资源更少)
		多道技术:切换+保存状态
	如果多个任务都是计算密集型 那么多线程确实没有优势 但是可以用多进程
		CPU越多越好
	
以后用python就可以多进程下面开设多线程从而达到效率最大化
"""
1.所有的解释型语言都无法做到同一个进程下多个线程利用多核优势
2.GIL在实际编程中其实不用考虑
这篇关于消息队列与线程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!