消费速度 > 生产速度
import threading, time, logging, random FORMAT = '%(asctime)-15s [%(threadName)-10s %(thread)8d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.ERROR) class Dispatcher: def __init__(self): self.datum = None self.__event = threading.Event() def produce(self): for v in range(100): self.datum = random.randrange(1, 100, 2) self.__event.wait(1) def consume(self): while True: logging.error(self.datum) self.datum = None self.__event.wait(0.5) d = Dispatcher() p = threading.Thread(name='producer', target=d.produce) c = threading.Thread(name='consumer', target=d.consume) c.start() p.start()
import threading, time, logging, random FORMAT = '%(asctime)-15s [%(threadName)-10s %(thread)6d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.ERROR) class Dispatcher: def __init__(self): self.datum = None self.__event = threading.Event() self.__condition = threading.Condition() def produce(self): for v in range(10): with self.__condition: self.datum = random.randrange(1, 100, 2) logging.critical(f'{threading.current_thread()} produce {self.datum}') self.__condition.notify_all() self.__event.wait(1) self.__event.wait(2) self.__event.set() logging.critical(f'{threading.current_thread()} done') def consume(self): while not self.__event.is_set(): with self.__condition: logging.error(f'consuming {self.datum}') self.datum = None # self.__condition.wait() # self.__condition.wait_for(predicate=lambda: self.__event.is_set()) self.__condition.wait(timeout=2) logging.error(f'consume done') self.__event.wait(0.5) d = Dispatcher() p = threading.Thread(name='producer', target=d.produce) c = threading.Thread(name='consumer', target=d.consume) p.start() time.sleep(4) c.start()
import threading, time, logging, random FORMAT = '%(asctime)-15s [%(threadName)-11s %(thread)6d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.DEBUG) class Dispatcher: def __init__(self): self.data = None self.event = threading.Event() def produce(self, total): for _ in range(total): data = random.randrange(1, 100, 2) logging.info(data) self.data = data self.event.wait(1) self.event.set() def consume(self): while not self.event.is_set(): logging.info(f'consume {self.data}') self.data = None self.event.wait(0.5) d = Dispatcher() p = threading.Thread(name='producer', target=d.produce, args=(10,)) c = threading.Thread(name='consumer', target=d.consume) c.start() p.start()
import threading, time, logging, random FORMAT = '%(asctime)-15s [%(threadName)-11s %(thread)6d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.DEBUG) class Dispatcher: def __init__(self): self.data = None self.event = threading.Event() self.condition = threading.Condition() def produce(self, total): for _ in range(total): data = random.randrange(1, 100, 2) with self.condition: logging.info(data) self.data = data self.condition.notify_all() self.event.wait(0.4) self.event.set() def consume(self): while not self.event.is_set(): with self.condition: self.condition.wait() logging.info(f'consume {self.data}') self.data = None self.event.wait(0.2) d = Dispatcher() p = threading.Thread(name='producer', target=d.produce, args=(10,)) c = threading.Thread(name='consumer', target=d.consume) c.start() p.start()
import threading, time, logging, random FORMAT = '%(asctime)-15s [%(threadName)-11s %(thread)6d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.DEBUG) class Dispatcher: def __init__(self): self.data = None self.event = threading.Event() self.condition = threading.Condition() def produce(self, total): for _ in range(total): data = random.randrange(1, 100, 2) with self.condition: logging.info(data) self.data = data self.condition.notify_all() self.event.wait(0.4) self.event.set() with self.condition: # 将consumer从self.condition.wait()中唤醒 self.condition.notify(1) # 模拟生产速度 def consume(self): while not self.event.is_set(): with self.condition: self.condition.wait() # block wait for notify logging.info(f'consume {self.data}') self.data = None self.event.wait(0.2) # imitate consumer velocity d = Dispatcher() p = threading.Thread(name='producer', target=d.produce, args=(10,)) c = threading.Thread(name='consumer', target=d.consume) c.start() p.start()
import threading, time, logging, random FORMAT = '%(asctime)-15s [%(threadName)-11s %(thread)6d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.DEBUG) class Dispatcher: def __init__(self): self.data = None self.event = threading.Event() self.condition = threading.Condition() def produce(self, total): for _ in range(total): data = random.randrange(1, 100, 2) with self.condition: logging.info(data) self.data = data # self.condition.notify_all() self.condition.notify(2) self.event.wait(0.4) # 模拟生产速度 self.event.set() with self.condition: # 将consumer从self.condition.wait()中唤醒 # self.condition.notify(1) self.condition.notify_all() def consume(self): while not self.event.is_set(): with self.condition: self.condition.wait() # block wait for notify logging.info(f'consume {self.data}') self.data = None self.event.wait(0.2) # imitate consumer velocity d = Dispatcher() p = threading.Thread(name='producer', target=d.produce, args=(10,)) # c = threading.Thread(name='consumer', target=d.consume) # c.start() for b in range(5): # increase consumer threading.Thread(name=f'consumer-{b}', target=d.consume).start() p.start()