class BlockedQueue<T>{ //阻塞队列 private Deque<T> queue=new ArrayDeque<>(); //锁 private ReentrantLock lock=new ReentrantLock(); //空队列条件 private Condition emptyCondition=lock.newCondition(); //满队列条件 private Condition fullCondition=lock.newCondition(); private int capcity; public BlockedQueue(int capcity) { this.capcity = capcity; } public T take(){ lock.lock(); try{ while(queue.isEmpty()){ try { emptyCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); //唤醒满队列 fullCondition.signal(); return t; }finally { lock.unlock(); } } public void put(T t){ lock.lock(); try{ while(queue.size()==capcity){ try { fullCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(t); emptyCondition.signal(); }finally { lock.unlock(); } } public int getCapcity(){ lock.lock(); try{ return this.capcity; }finally { lock.unlock(); } } }
public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ long nacos = unit.toNanos(timeout); while(queue.isEmpty()){ try { //等待超时,后自动解锁。 if(nacos<=0){ return null; } nacos= emptyCondition.awaitNanos(nacos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); //唤醒满队列 fullCondition.signal(); return t; }finally { lock.unlock(); } }
class ThreadPool{ private BlockedQueue<Runnable> queue; private HashSet<Worker> workers=new HashSet<>(); private int coreSize; private long timeout; private TimeUnit timeUnit; /** * 初始化 * @param coreSize * @param timeout * @param timeUnit * @param queueCapcity */ public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; queue=new BlockedQueue<Runnable>(queueCapcity); } class Worker{ } }
public void execute(Runnable task){ synchronized (this){ //保证只有一个线程能够被创建 if(workers.size()<coreSize){ Worker worker = new Worker(task); workers.add(worker); }else{ //如果没有线程处理那么就放入到阻塞队列 queue.put(task); } } } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { while(task!=null||(task=queue.take())!=null){ try { task.run(); } catch (Exception e) { e.printStackTrace(); }finally { task=null; } } //执行完任务之后立刻移除当前线程。 synchronized (workers){ workers.remove(this); } } }
@Slf4j(topic = "c.test") public class MyTestThreadPool { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10); for(int i=0;i<5;i++){ int j=i; threadPool.execute(()->{ log.debug("执行任务{}",j); }); } } } @Slf4j(topic = "c.pool") class ThreadPool{ private BlockedQueue<Runnable> queue; private HashSet<Worker> workers=new HashSet<>(); private int coreSize; private long timeout; private TimeUnit timeUnit; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; queue=new BlockedQueue<Runnable>(queueCapcity); } public void execute(Runnable task){ synchronized (this){ //保证只有一个线程能够被创建 if(workers.size()<coreSize){ Worker worker = new Worker(task); log.debug("线程和任务创建{},{}",worker,task); workers.add(worker); worker.start(); }else{ log.debug("加入到任务队列{}",task); //如果没有线程处理那么就放入到阻塞队列 queue.put(task); } } } // @Slf4j(topic = "worker") class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { while(task!=null||(task=queue.poll(timeout,timeUnit))!=null){ try { log.debug("任务被执行{}",task); task.run(); } catch (Exception e) { e.printStackTrace(); }finally { task=null; } } //执行完任务之后立刻移除当前线程。 synchronized (workers){ log.debug("线程完成任务退出池{}",this); workers.remove(this); } } } } class BlockedQueue<T>{ //阻塞队列 private Deque<T> queue=new ArrayDeque<>(); //锁 private ReentrantLock lock=new ReentrantLock(); //空队列条件 private Condition emptyCondition=lock.newCondition(); //满队列条件 private Condition fullCondition=lock.newCondition(); private int capcity; public BlockedQueue(int capcity) { this.capcity = capcity; } public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ long nacos = unit.toNanos(timeout); while(queue.isEmpty()){ try { //等待超时,后自动解锁。 if(nacos<=0){ return null; } nacos= emptyCondition.awaitNanos(nacos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); //唤醒满队列 fullCondition.signal(); return t; }finally { lock.unlock(); } } public T take(){ lock.lock(); try{ while(queue.isEmpty()){ try { emptyCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); //唤醒满队列 fullCondition.signal(); return t; }finally { lock.unlock(); } } public void put(T t){ lock.lock(); try{ while(queue.size()==capcity){ try { fullCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(t); emptyCondition.signal(); }finally { lock.unlock(); } } public int getCapcity(){ lock.lock(); try{ return this.capcity; }finally { lock.unlock(); } } }
public boolean offer(T t,long timeout,TimeUnit unit){ lock.lock(); try{ long nacos=unit.toNanos(timeout); while(queue.size()==capcity){ try { log.debug("任务队列满了..."); if(nacos<=0){ return false; } nacos = fullCondition.awaitNanos(nacos); } catch (InterruptedException e) { e.printStackTrace(); } } //任务加入到队列 log.debug("加入到任务队列{}",t); queue.addLast(t); emptyCondition.signal(); return true; }finally { lock.unlock(); } }
@FunctionalInterface interface RejectPolicy<T>{ void reject(BlockedQueue<T> queue,T task); }
public void execute(Runnable task){ synchronized (this){ //保证只有一个线程能够被创建 if(workers.size()<coreSize){ Worker worker = new Worker(task); log.debug("线程和任务创建{},{}",worker,task); workers.add(worker); worker.start(); }else{ //如果没有线程处理那么就放入到阻塞队列 queue.tryput(rejectPolicy,task); } } }
public void tryput(RejectPolicy<T> rejectPolicy, T task) { //判断是否满,本队列在调用自己的加入方法,然后其他考自定义 lock.lock(); try{ if(queue.size()==capcity){ rejectPolicy.reject(this,task); }else{ log.debug("加入到任务队列{}",task); queue.addLast(task); emptyCondition.signal(); } }finally { lock.unlock(); } }
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 0 ,(queue,task)->{ //死等 // queue.put(task); //超时等待 // queue.offer(task,500,TimeUnit.MILLISECONDS); //放弃执行 // log.debug("放弃任务{}",task); //抛出异常 // throw new RuntimeException("任务执行失败"); //调用者自己执行,不能使用线程池了 task.run(); });
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
@Slf4j(topic = "c.test") public class TestThreadPoolExecutors { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() { private AtomicInteger i=new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r,"线程"+i.getAndIncrement()); } }); pool.execute(()->{ log.debug("1"); }); } }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
@Slf4j(topic = "c.TestSynchronousQueue") public class TestSynchronousQueue { public static void main(String[] args) { SynchronousQueue<Integer> integers = new SynchronousQueue<>(); new Thread(() -> { try { log.debug("putting {} ", 1); integers.put(1); log.debug("{} putted...", 1); log.debug("putting...{} ", 2); integers.put(2); log.debug("{} putted...", 2); } catch (InterruptedException e) { e.printStackTrace(); } },"t1").start(); sleep(1); new Thread(() -> { try { log.debug("taking {}", 1); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t2").start(); sleep(1); new Thread(() -> { try { log.debug("taking {}", 2); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t3").start(); } }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
@Slf4j(topic = "c.test") public class TestSubmit { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(1); Future<String> future = service.submit(new Callable<String>() { @Override public String call() throws Exception { log.debug("好人"); return "ok"; } }); log.debug("结果{}",future.get()); } }
@Slf4j(topic = "c.test") public class TestSubmit { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(1); List<Future<Object>> futures = service.invokeAll(Arrays.asList( () -> { Sleeper.sleep(1); log.debug("测试1"); return "1"; }, () -> { Sleeper.sleep(0.5); log.debug("测试2"); return "2"; }, () -> { Sleeper.sleep(2); log.debug("测试3"); return "3"; } ) ); futures.forEach(f->{ try { log.debug("结果{}",f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } }
@Slf4j(topic = "c.test") public class TestSubmit { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(1); String s = service.invokeAny(Arrays.asList( () -> { Sleeper.sleep(1); log.debug("{}", 1); return "1"; }, () -> { Sleeper.sleep(0.5); log.debug("{}", 3); return "1"; }, () -> { Sleeper.sleep(2); log.debug("{}", 2); return "1"; } )); log.debug("{}",s); } }
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //修改线程池状态 advanceRunState(SHUTDOWN); //关闭所有空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //修改状态为stop advanceRunState(STOP); //关闭所有线程 interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
@Slf4j(topic = "c.TestShutDown") public class TestShutDown { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future<Integer> result1 = pool.submit(() -> { log.debug("task 1 running..."); Thread.sleep(1000); log.debug("task 1 finish..."); return 1; }); Future<Integer> result2 = pool.submit(() -> { log.debug("task 2 running..."); Thread.sleep(1000); log.debug("task 2 finish..."); return 2; }); Future<Integer> result3 = pool.submit(() -> { log.debug("task 3 running..."); Thread.sleep(1000); log.debug("task 3 finish..."); return 3; }); log.debug("shutdown"); pool.shutdown(); pool.awaitTermination(3, TimeUnit.SECONDS); log.debug("other"); // List<Runnable> runnables = pool.shutdownNow(); // log.debug("other.... {}" , runnables); } }
ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute(()->{ log.debug("点菜"); Future<String> future = pool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜{}",future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } });
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅"); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService waiterPool = Executors.newFixedThreadPool(1); ExecutorService cookPool = Executors.newFixedThreadPool(1); waiterPool.execute(()->{ log.debug("点菜"); Future<String> future = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜{}",future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); waiterPool.execute(()->{ log.debug("点菜"); Future<String> future = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜{}",future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }
@Slf4j(topic = "c.32") public class MyTest32 { public static void main(String[] args) { Timer timer=new Timer(); TimerTask task1=new TimerTask() { @Override public void run() { log.debug("执行1"); Sleeper.sleep(2); } }; TimerTask task2=new TimerTask() { @Override public void run() { log.debug("执行2"); } }; timer.schedule(task1,1000); timer.schedule(task2,1000); } }
@Slf4j(topic = "c.33") public class MyTest33 { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); scheduledThreadPool.schedule(()->{ log.debug("1"); int i=1/0; Sleeper.sleep(1); },1, TimeUnit.SECONDS); scheduledThreadPool.schedule(()->{ log.debug("2"); },1,TimeUnit.SECONDS); } }
@Slf4j(topic = "c.33") public class MyTest33 { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); // scheduledThreadPool.scheduleAtFixedRate(()->{ // log.debug("运行"); // Sleeper.sleep(2); // },1,1,TimeUnit.SECONDS); scheduledThreadPool.scheduleWithFixedDelay(()->{ log.debug("运行"); Sleeper.sleep(2); },1,1,TimeUnit.SECONDS); } }
Future<String> future = scheduledThreadPool.submit(() -> { log.debug("测试"); // int i = 1 / 0; return "1"; }); log.debug("{}",future.get());
LocalDateTime now=LocalDateTime.now(); LocalDateTime time = now.withHour(18).withSecond(0).withMinute(0).withNano(0).with(DayOfWeek.THURSDAY); log.debug("{}",now); log.debug("{}",time); if(now.compareTo(time)>0){ time=time.plusWeeks(1); } log.debug("{}",time); long l = Duration.between(now, time).toMillis(); System.out.println(l);
public void execute(Runnable command) { if (this.executor != null) { try { //执行任务 this.executor.execute(command); } catch (RejectedExecutionException var3) { //发现线程和队列都没有空闲,先尝试把任务再次加入到队列中 if (!((TaskQueue)this.executor.getQueue()).force(command)) { throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull")); } } } else { throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); } }
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (this.parent != null && !this.parent.isShutdown()) { //调用超时任务加入 return super.offer(o, timeout, unit); } else { throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); } }
@Slf4j(topic = "c.TestForkJoin2") public class TestForkJoin2 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new MyTask(5))); // new MyTask(5) 5+ new MyTask(4) 4 + new MyTask(3) 3 + new MyTask(2) 2 + new MyTask(1) } } // 1~n 之间整数的和 @Slf4j(topic = "c.MyTask") class MyTask extends RecursiveTask<Integer> { private int n; public MyTask(int n) { this.n = n; } @Override public String toString() { return "{" + n + '}'; } @Override protected Integer compute() { // 如果 n 已经为 1,可以求得结果了 if (n == 1) { log.debug("join() {}", n); return n; } // 将任务进行拆分(fork) AddTask1 t1 = new AddTask1(n - 1); t1.fork(); log.debug("fork() {} + {}", n, t1); // 合并(join)结果 int result = n + t1.join(); log.debug("join() {} + {} = {}", n, t1, result); return result; } }
@Slf4j(topic = "c.test111") public class MyLockTest { public static void main(String[] args) { MyLock lock = new MyLock(); new Thread(()->{ System.out.println("线程1执行"); lock.lock(); try{ log.debug("上锁1"); Sleeper.sleep(2); }finally { log.debug("解锁1"); lock.unlock(); } },"t1").start(); new Thread(()->{ lock.lock(); try{ log.debug("上锁2"); Sleeper.sleep(1); }finally { log.debug("解锁2"); lock.unlock(); } },"t2").start(); } } class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { //尝试加锁 if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { //尝试释放锁 setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { //锁是不是被线程持有 return getState()==1; } public Condition newCondition(){ return new ConditionObject(); } } private MySync sync=new MySync(); @Override//加锁 public void lock() { sync.acquire(1); } @Override//加锁,可以被中断 public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override//尝试加锁,不成功就放弃 public boolean tryLock() { return sync.tryAcquire(1); } //尝试加锁,超时就进入队列 @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long waitTime = unit.toNanos(time); return sync.tryAcquireNanos(1,waitTime); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { // sync.newCondition(); return sync.newCondition(); } }
public final void acquire(int arg) { if (!tryAcquire(arg) &&//尝试加锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//加锁失败线程进入阻塞队列 selfInterrupt(); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
public void lock() {//ReentrantLock的方法 sync.lock(); }
final void lock() {//NonfairSync if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//解锁后的位置 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;//获取下一个节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//解锁 }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null);//设置owner为空 } setState(c);//修改状态 return free; }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//如果锁没有锁定资源,那么就把锁交给当前线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//如果当前的资源是当前线程那么就可以修改状态,也就是锁被同1线程获取多少次 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { //state-1相当于就是锁释放一次,如果发现不是0那么就是可重入锁。 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//中断并且设置interrupted为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();//调用中断 } static void selfInterrupt() {//再次调用线程中断 Thread.currentThread().interrupt(); }
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //被打断后抛出异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());//判断前驱节点的下一个是不是为空 }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//直接判断而不会去查询AQS是不是有节点在阻塞 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter();//进入condition int savedState = fullyRelease(node);//清空锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this);//进入阻塞 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) {//清空锁实际上就是清空state failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒同步器队列的线程。 return true; } return false; }
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
public class MyReadAndWrite { public static void main(String[] args) { DataContainer dataContainer = new DataContainer("123"); new Thread(()->{ dataContainer.read(); },"t1").start(); Sleeper.sleep(0.1); new Thread(()->{ dataContainer.write(); },"t2").start(); } } @Slf4j(topic = "c.data") class DataContainer{ public DataContainer(String s) { this.s = s; } private String s; private ReentrantReadWriteLock rw=new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r=rw.readLock(); private ReentrantReadWriteLock.WriteLock w=rw.writeLock(); public void read(){ log.debug("上读锁"); r.lock(); try{ log.debug("读取{}",s); Sleeper.sleep(1); }finally { log.debug("释放读锁"); r.unlock(); } } public void write(){ log.debug("上写锁"); w.lock(); try{ log.debug("读取{}",s); }finally { log.debug("释放写锁"); w.unlock(); } } }
@Override public <T> T queryOne(Class<T> beanClass, String sql, Object... args) { // 先从缓存中找,找到直接返回 SqlPair key = new SqlPair(sql, args);; // rw.readLock().lock(); rw.readLock().lock(); try { T value = (T) map.get(key); if(value != null) { return value; } } finally { rw.readLock().unlock(); // rw.readLock().unlock(); } // rw.writeLock().lock(); rw.writeLock().lock(); try { // 多个线程 T value = (T) map.get(key); // if(value == null) { // 缓存中没有,查询数据库 if(value==null){ value = dao.queryOne(beanClass, sql, args); map.put(key, value); } // } return value; } finally { rw.writeLock().unlock(); // rw.writeLock().unlock(); } } @Override public int update(String sql, Object... args) { // rw.writeLock().lock(); rw.writeLock().lock(); try { // 先更新库 int update = dao.update(sql, args); // 清空缓存 map.clear(); return update; } finally { rw.writeLock().unlock(); // rw.writeLock().unlock(); } }
public void lock() { sync.acquire(1); }
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread())//如果别人获取读锁或者是owner不是自己,说明了别人获取了写锁或者是读锁,我无法获取锁。假设都不成立说明写锁被获取,并且owner是自己,那么就可以执行写锁可重入。 return false;/ if (w + exclusiveCount(acquires) > MAX_COUNT)//大于可重入的最大值 throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires);//重入 return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires))//判断是不是公平锁,判断之后就上锁 return false; setExclusiveOwnerThread(current); return true; }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())//如果下一个节点还是读锁 //再次唤醒 doReleaseShared(); } }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)//如果别人获取了锁c自然就不是0,然后如果也不是自己的线程那么就没办法获取锁,返回-1 return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//加入节点到队列 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) {//如果是老二 int r = tryAcquireShared(arg);//再次尝试获取锁 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) &&//标记前驱节点状态-1 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
public void unlock() { sync.releaseShared(1); }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //解锁的关键代码 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
取 node
- 再次tryAcquire
- 如果是写锁删头,如果是读锁再次唤醒读锁线程
- 结束
@Slf4j(topic = "c.TestStampedLock") public class TestStampedLock { public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.write(1); }, "t2").start(); } } @Slf4j(topic = "c.DataContainerStamped") class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } public int read(int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}", stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}", stamp, data); return data; } // 锁升级 - 读锁 log.debug("updating to read lock... {}", stamp); try { stamp = lock.readLock(); log.debug("read lock {}", stamp); sleep(readTime); log.debug("read finish...{}, data:{}", stamp, data); return data; } finally { log.debug("read unlock {}", stamp); lock.unlockRead(stamp); } } public void write(int newData) { long stamp = lock.writeLock(); log.debug("write lock {}", stamp); try { sleep(2); this.data = newData; } finally { log.debug("write unlock {}", stamp); lock.unlockWrite(stamp); } } }
取 node
- 再次tryAcquire
- 如果是写锁删头,如果是读锁再次唤醒读锁线程
- 结束
@Slf4j(topic = "c.TestStampedLock") public class TestStampedLock { public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.write(1); }, "t2").start(); } } @Slf4j(topic = "c.DataContainerStamped") class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } public int read(int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}", stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}", stamp, data); return data; } // 锁升级 - 读锁 log.debug("updating to read lock... {}", stamp); try { stamp = lock.readLock(); log.debug("read lock {}", stamp); sleep(readTime); log.debug("read finish...{}, data:{}", stamp, data); return data; } finally { log.debug("read unlock {}", stamp); lock.unlockRead(stamp); } } public void write(int newData) { long stamp = lock.writeLock(); log.debug("write lock {}", stamp); try { sleep(2); this.data = newData; } finally { log.debug("write unlock {}", stamp); lock.unlockWrite(stamp); } } }