僵尸进程与孤儿进程
守护进程
互斥锁(重点)
消息队列
实现进程间数据交互(IPC机制)
生产者消费者模型
线程理论(重要)
from multiprocessing import Process import time def test(name): print('总管:%s is running' % name) time.sleep(3) print('总管:%s is over' % name) if __name__ == '__main__': p = Process(target=test, args=('jason',)) p.daemon = True # 设置为守护进程(一定要放在start语句上方) p.start() print("皇帝jason寿终正寝") time.sleep(0.1)
简介:并发情况下操作同一份数据 极其容易造成数据错乱 解决措施:将并发变成串行 虽然降低了效率但是提升了数据的安全
锁就可以实现将并发变成串行的效果
行锁、表锁
使用锁的注意事项
在主进程中产生 交由子进程使用
1.一定要在需要的地方加锁 千万不要 随意加
2.不要轻易的使用锁(死锁现象)
#以后的编程生涯中 几乎不会解除到自己操作锁的情况
import json from multiprocessing import Process, Lock import time import random # 查票 def search(name): with open(r'data.txt', 'r', encoding='utf8') as f: data_dict = json.load(f) ticket_num = data_dict.get('ticket_num') print('%s查询余票:%s' % (name, ticket_num)) # 买票 def buy(name): # 先查票 with open(r'data.txt', 'r', encoding='utf8') as f: data_dict = json.load(f) ticket_num = data_dict.get('ticket_num') # 模拟一个延迟 time.sleep(random.random()) # 判断是否有票 if ticket_num > 0: # 将余票减一 data_dict['ticket_num'] -= 1 # 重新写入数据库 with open(r'data.txt', 'w', encoding='utf8') as f: json.dump(data_dict, f) print('%s: 购买成功' % name) else: print('不好意思 没有票了!!!') def run(name,mutex): search(name) mutex.acquire() # 抢锁 buy(name) mutex.release() # 释放锁 if __name__ == '__main__': mutex = Lock() for i in range(1, 11): p = Process(target=run, args=('用户%s' % i,mutex)) p.start()
from multiprocessing import Queue q = Queue(5) # 括号内可以填写最大等待数 # 存放数据 q.put(111) q.put(222) # print(q.full()) # False 判断队列中数据是否满了 q.put(333) q.put(444) q.put(555) # print(q.full()) # q.put(666) # 超出范围原地等待 直到有空缺位置 # 提取数据 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 没有数据之后原地等待直到有数据为止 print(q.get_nowait()) # 没有数据立刻报错
from multiprocessing import Queue, Process def producer(q): q.put("子进程p放的数据") def consumer(q): print('子进程c取的数据',q.get()) if __name__ == '__main__': q = Queue() p = Process(target=producer, args=(q,)) c = Process(target=consumer, args=(q,)) p.start() c.start() # q.put('主进程放的数据') # p = Process(target=consumer, args=(q,)) # p.start() # p.join() # print(q.get()) # print('主')
生产者:负责产生数据
消费者:负责处理数据
该模型需要解决恭喜不平衡现象
from multiprocessing import Queue, Process, JoinableQueue import time import random def producer(name, food, q): for i in range(10): print('%s 生产了 %s' % (name, food)) q.put(food) time.sleep(random.random()) def consumer(name, q): while True: data = q.get() print('%s 吃了 %s' % (name, data)) q.task_done() if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producer, args=('大厨jason', '玛莎拉', q)) p2 = Process(target=producer, args=('印度阿三', '飞饼', q)) p3 = Process(target=producer, args=('泰国阿人', '榴莲', q)) c1 = Process(target=consumer, args=('班长阿飞', q)) p1.start() p2.start() p3.start() c1.daemon = True c1.start() p1.join() p2.join() p3.join() q.join() # 等待队列中所有的数据被取干净 print('主')
什么是线层?进程其实是一个资源单位 真正被CPU执行的其实是进程里面的线程
进程类似于是工厂 线程类似于是工厂里面的一条条流水线 所有的进程肯定含有最少一个线程
进程间数据默认是隔离的 但是同一个进程内的多个线程数据是共享的
1.重新申请一块内存空间
2.将所需的资源全部导入
上述两个步骤都不需要 所以开设线程消耗的资源远比开设进程的少
from threading import Thread import time def test(name): print('%s is running' % name) time.sleep(3) print('%s is over' % name) t = Thread(target=test, args=('jason',)) t.start() print('主') class MyClass(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print('%s is running' % self.name) time.sleep(3) print('%s is over' % self.name) obj = MyClass('jason') obj.start() print('主线程')
1.join方法 2.获取进程号(验证同一个进程内可以开设多个线程) 3.active_count统计当前正在活跃的线程数 4.current_thread
主线程的结束意味着整个进程的结束
所以主线程需要等待里面所有非守护线程的结束才能结束
from threading import Thread from multiprocessing import Process import time def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(1) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
from threading import Thread money = 100 def test(): global money money = 999 t = Thread(target=test) t.start() t.join() print(money)
from threading import Thread, Lock from multiprocessing import Lock import time num = 100 def test(mutex): global num mutex.acquire() # 先获取num的数值 tmp = num # 模拟延迟效果 time.sleep(0.1) # 修改数值 tmp -= 1 num = tmp mutex.release() t_list = [] mutex = Lock() for i in range(100): t = Thread(target=test, args=(mutex,)) t.start() t_list.append(t) # 确保所有的子线程全部结束 for t in t_list: t.join() print(num)
import socket from threading import Thread from multiprocessing import Process server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) def talk(sock): while True: try: data = sock.recv(1024) if len(data) == 0: break print(data.decode('utf8')) sock.send(data + b'gun dan!') except ConnectionResetError as e: print(e) break sock.close() while True: sock, addr = server.accept() print(addr) # 开设多进程或者多线程 t = Thread(target=talk, args=(sock,)) t.start()