10个工人生产100个杯子
import time, logging, threading, datetime FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) cups = [] def vagary(task=100): while True: n = len(cups) logging.info(f'{threading.current_thread()} number: {n}') if n >= task: break cups.append(5) logging.warning(f'{threading.current_thread()} produce {n + 1}') for b in range(10): threading.Thread(target=vagary, name=f'vagary-{b}', args=(100,)).start()
import time, logging, threading, datetime FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) cups = [] lock = threading.Lock() def vagary(lock: threading.Lock, task=100): while True: lock.acquire(blocking=True) n = len(cups) logging.info(f'{threading.current_thread()} number: {n}') if n >= task: lock.release() break cups.append(5) logging.warning(f'{threading.current_thread()} produce {n + 1}') lock.release() for b in range(10): threading.Thread(target=vagary, name=f'vagary-{b}', args=(lock, 100)).start()
import time, logging, threading, datetime FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class Counter: def __init__(self): self.__value = 0 self.lock = threading.Lock() def increment(self): # self.lock.acquire(blocking=True) self.__value += 1 # self.lock.release() def decrement(self): # self.lock.acquire(blocking=True) self.__value -= 1 # self.lock.release() @property def value(self): return self.__value def do(c: Counter, lock: threading.Lock, count=100): for _ in range(count): for i in range(-50, 50): lock.acquire() if i < 0: c.decrement() else: c.increment() lock.release() c = Counter() lock = threading.Lock() for i in range(10): t = threading.Thread(target=do, args=(c, lock, 10000)) # 此数值给高,否则线程瞬间结束 t.start() # t.join() # 主线程会等待t结束,再开启下个线程 while True: time.sleep(1) print(threading.enumerate()) print(c.value)
import time, logging, threading, datetime FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) class Counter: def __init__(self): self.__value = 0 self.__lock = threading.Lock() @property def value(self): with self.__lock: return self.__value def increase(self): try: self.__lock.acquire(blocking=True) self.__value += 1 finally: self.__lock.release() def decrease(self): # try: # self.__lock.acquire(blocking=True) # self.__value -= 1 # finally: # self.__lock.release() with self.__lock: self.__value -= 1 def run(c: Counter, count=100): for _ in range(count): for i in range(-50, 50): if i < 0: c.decrease() else: c.increase() c = Counter() for b in range(10): threading.Thread(name=f'Thread--{b}', target=run, args=(c, 10000)).start() while True: time.sleep(1) if threading.active_count() == 1: print(threading.enumerate()) logging.warning(f'end {c.value}') break # 结束主进程 else: print(threading.enumerate()) logging.info(f'progress {c.value}')
import threading, time lock = threading.Lock() def w(): time.sleep(3) lock.release() lock.acquire() print(lock.locked()) threading.Thread(target=w).start() lock.acquire() print('get lock') lock.release() print('release') print(lock.locked())
****** 如果一个线程 lock.acquire()成功后, 结束 则不会自动lock.release()
import threading, time lock = threading.Lock() def b(lock: threading.Lock): lock.acquire() time.sleep(2) print(threading.current_thread(), 'exit 555555') # lock.release() def p(lock: threading.Lock): while True: time.sleep(1) if lock.acquire(): print('lock.acquire success') break print('lock.acquire failed') threading.Thread(args=(lock,), target=b, name='bb').start() threading.Thread(args=(lock,), target=p, name='pp').start() threading.Timer(interval=6, function=lambda lock: lock.release(), args=(lock,)).start() while True: print(threading.enumerate()) time.sleep(0.7)
非阻塞锁:
import threading, time, logging FORMAT = '%(asctime)-20s [%(threadName)10s, %(thread)8d] %(message)s' logging.basicConfig(format=FORMAT, level=logging.WARNING) def vagary(vails): for vail in vails: time.sleep(0.001) if vail.lock.acquire(blocking=False): # 获取锁, 返回True logging.warning('{} {} acquire success'.format(threading.current_thread(), vail.name)) # 不释放锁 else: logging.error(f'{threading.current_thread()} {vail.name} acquire failed') class Vail: def __init__(self, name): self.name = name self.lock = threading.Lock() vails = [Vail(f'vail-{i}') for i in range(10)] # 10个vail, 每个vail, 5个线程只有一个获取成功 for i in range(5): # 启动五个线程 threading.Thread(name=f'Thread-{i}', target=vagary, args=(vails,)).start()
Reentrant Lock threading.RLock
一个线程内可acquire多次, 必须全部释放完毕 ,其他线程才可acquire, 必须在自己线程内释放
import threading import time lock = threading.RLock() print(lock, type(lock)) print(lock.acquire()) print(lock.acquire()) print(lock, threading.current_thread()) print(lock.acquire()) def sub(rlock: threading.RLock): print('in sub', rlock, threading.current_thread()) rlock.acquire() print('in sub', rlock, threading.current_thread()) rlock.release() threading.Thread(target=sub, args=(lock,)).start() # 不能使用Timer, 必须在主线程内release # threading.Timer(interval=5, function=lambda lock: [lock.release()] * 3, args=(lock,)).start() time.sleep(5) lock.release() lock.release() lock.release()