ThreadLocal的文章在网上也有不少,但是看了一些后,理解起来总感觉有绕,而且看了ThreadLocal的源码,无论是线程隔离、类环形数组、弱引用结构等等,实在是太有意思了!我必须也要让大家全面感受下其中所蕴含的那些奇思妙想! 所以这里我想写一篇超几儿通俗易懂解析ThreadLocal的文章,相关流程会使用大量图示解析,以证明:我是干货,不是水比!
ThreadLocal这个类加上庞大的注释,总共也才七百多行,而且你把这个类的代码拷贝出来,你会发现,它几乎没有报错!耦合度极低!(唯一的报错是因为ThreadLocal类引用了Thread类里的一个包内可见变量,所以把代码复制出来,这个变量访问就报错了,仅仅只有此处报错!)
ThreadLocal的线程数据隔离,替换算法,擦除算法,都是有必要去了解了解,仅仅少量的代码,却能实现如此精妙的功能,让我们来体会下 Josh Bloch 和 Doug Lea 俩位大神,巧夺天工之作吧!
一些说明
这篇文章画了不少图,大概画了十八张图,关于替换算法和擦除算法,这俩个方法所做的事情,如果不画图,光用文字描述的话,十分的抽象且很难描述清楚;希望这些流程图,能让大家更能体会这些精炼代码的魅力!
哔哔原理之前,必须要先来看下使用
set()
和get()
方法即可public class Main { public static void main(String[] args) { ThreadLocal<String> threadLocalOne = new ThreadLocal<>(); ThreadLocal<String> threadLocalTwo = new ThreadLocal<>(); new Thread(new Runnable() { @Override public void run() { threadLocalOne.set("线程一的数据 --- threadLocalOne"); threadLocalTwo.set("线程一的数据 --- threadLocalTwo"); System.out.println(threadLocalOne.get()); System.out.println(threadLocalTwo.get()); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println(threadLocalOne.get()); System.out.println(threadLocalTwo.get()); threadLocalOne.set("线程二的数据 --- threadLocalOne"); threadLocalTwo.set("线程二的数据 --- threadLocalTwo"); System.out.println(threadLocalOne.get()); System.out.println(threadLocalTwo.get()); } }).start(); } }
线程一的数据 --- threadLocalOne 线程一的数据 --- threadLocalTwo null null 线程二的数据 --- threadLocalOne 线程二的数据 --- threadLocalTwo
在解释ThreadLocal整体逻辑前,需要先了解几个前置知识
下面这些前置知识,是在说set和get前,必须要先了解的知识点,了解下面这些知识点,才能更好去了解整个存取流程
在上面的ThreadLocal的使用中,我们发现一个很有趣的事情,ThreadLocal在不同的线程,好像能够存储不同的数据:就好像ThreadLocal本身具有存储功能,到了不同线程,能够生成不同的'副本'存储数据一样
实际上,ThreadLocal到底是怎么做到的呢?
//存数据 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocal.ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } //获取当前Thread的threadLocals变量 ThreadLocal.ThreadLocalMap getMap(Thread t) { return t.threadLocals; } //Thread类 public class Thread implements Runnable { ... /* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; ... }
强引用:不管内存多么紧张,gc永不回收强引用的对象
软引用:当内存不足,gc对软引用对象进行回收
弱引用:gc发现弱引用,就会立刻回收弱引用对象
软引用:在任何时候都可能被垃圾回收器回收
Entry就是一个实体类,这个实体类有俩个属性:key、value,key是就是咱们常说的的弱引用
当我们执行ThreadLocal的set操作,第一次则新建一个Entry或后续set则覆盖改Entry的value,塞到当前Thread的ThreadLocals变量中
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
你可能会想,what?我用ThreadLocal来set一个数据,然后gc一下,我Entry里面key变量引用链就断开了?
public class Main { public static void main(String[] args) { ThreadLocal<String> threadLocalOne = new ThreadLocal<>(); new Thread(new Runnable() { @Override public void run() { threadLocalOne.set("线程一的数据 --- threadLocalOne"); System.gc(); System.out.println(threadLocalOne.get()); } }).start(); } }
线程一的数据 --- threadLocalOne
看来这里gc了个寂寞。。。
在这里,必须明确一个道理:gc回收弱引用对象,是先回收弱引用的对象,弱引用链自然断开;而不是先断开引用链,再回收对象。Entry里面key对ThreadLocal的引用是弱引用,但是threadLocalOne对ThreadLocal的引用是强引用啊,所以ThreadLocal这个对象是没法被回收的
public class Main { static ThreadLocal<String> threadLocalOne = new ThreadLocal<>(); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { threadLocalOne.set("线程一的数据 --- threadLocalOne"); try { threadLocalOne = null; System.gc(); //下面代码来自:https://blog.csdn.net/thewindkee/article/details/103726942 Thread t = Thread.currentThread(); Class<? extends Thread> clz = t.getClass(); Field field = clz.getDeclaredField("threadLocals"); field.setAccessible(true); Object threadLocalMap = field.get(t); Class<?> tlmClass = threadLocalMap.getClass(); Field tableField = tlmClass.getDeclaredField("table"); tableField.setAccessible(true); Object[] arr = (Object[]) tableField.get(threadLocalMap); for (Object o : arr) { if (o == null) continue; Class<?> entryClass = o.getClass(); Field valueField = entryClass.getDeclaredField("value"); Field referenceField = entryClass.getSuperclass().getSuperclass().getDeclaredField("referent"); valueField.setAccessible(true); referenceField.setAccessible(true); System.out.println(String.format("弱引用key:%s 值:%s", referenceField.get(o), valueField.get(o))); } } catch (Exception e) { } } }).start(); } }
弱引用key:null 值:线程一的数据 --- threadLocalOne 弱引用key:java.lang.ThreadLocal@387567b2 值:java.lang.ref.SoftReference@2021fb3f
public class ThreadLocal<T> { ... static class ThreadLocalMap { static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } /** * The table, resized as necessary. * table.length MUST always be a power of two. */ private Entry[] table; ... } }
public class ThreadLocal<T> { ... static class ThreadLocalMap { ... private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } ... } }
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); }
private static final int INITIAL_CAPACITY = 16; ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; ... }
private void set(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
public class ThreadLocal<T> { private final int threadLocalHashCode = nextHashCode(); private static final int HASH_INCREMENT = 0x61c88647; private static AtomicInteger nextHashCode = new AtomicInteger(); private void set(ThreadLocal<?> key, Object value) { ... int i = key.threadLocalHashCode & (len-1); ... } private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } }
public class Main { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(); System.out.println(atomicInteger.getAndAdd(1)); //0 System.out.println(atomicInteger.getAndAdd(1)); //1 System.out.println(atomicInteger.getAndAdd(1)); //2 } }
该值的相加,符合斐波那契散列法,可以使得的低位的二进制数值分布的更加均匀,这样会减少在数组中产生hash冲突的次数
具体分析可查看:从 ThreadLocal 的实现看散列算法
等等大家有没有看到 threadLocalHashCode = nextHashCode(),nextHashCode()是获取下一个节点的方法啊,这是什么鬼?
难道每次使用key.threadLocalHashCode的时候,HashCode都会变?
public class ThreadLocal<T> { private final int threadLocalHashCode = nextHashCode(); }
好像又发现一个问题!threadHashCode通过 nextHashCode() 获取HashCode,然后nextHashCode是使用AtomicInteger类型的 nextHashCode变量相加,这玩意每次实例化的时候不都会归零吗?
难道我们每次新的ThreadLocal实例获取HashCode的时候,都要从0开始相加?
public class ThreadLocal<T> { private final int threadLocalHashCode = nextHashCode(); private static final int HASH_INCREMENT = 0x61c88647; private static AtomicInteger nextHashCode = new AtomicInteger(); private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } }
总结
上面代码中,用取得的hash值,与ThreadLocalMap实例中数组长度减一的与操作,计算出了index值
这个很重要的,因为大于长度的高位hash值是不需要的
此处会将传入的ThreadLocal实例计算出一个hash值,怎么计算的后面再说,这地方有个位与的操作,这地方是和长度减一的与操作,这个很重要的,因为大于长度的高位hash值是不需要的
private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); ... }
分析下塞值流程
实际上面的循环还值得去思考,来思考下这循环处理的事情
循环中获取当前index值,从Entry数组取到当前index位置的Entry对象
整体的逻辑比较清晰,如果key已存在,则覆盖;不存在,index位置是否可用,可用则使用该节点,不可用,往后寻找可用节点:线性探测法
在上述set方法中,当生成的index节点,已被占用,会向后探测可用节点
private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); ... if (k == null) { replaceStaleEntry(key, value, i); return; } } ... }
private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; // Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). int slotToExpunge = staleSlot; for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; // Find either the key or trailing null slot of run, whichever // occurs first for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. if (k == key) { e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e; // Start expunge at preceding stale entry if it exists if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // If key not found, put new entry in stale slot tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); // If there are any other stale entries in run, expunge them if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
上面的代码,很明显俩个循环是重点逻辑,这里面有俩个很重要的字段:slotToExpunge和staleSlot
第一个循环:很明显是往前列表头结点方向探测,是否还有key为null的节点,有的话将其下标赋值给slotToExpunge;这个探测是一个连续的不为null的节点链范围,有空节点,立马结束循环
第二个循环:很明显主要是向后探测,探测整个数组,这里有很重要逻辑
为什么会出现探测到Entry和传入key相同?
为什么要调换俩者位置呢?
这个问题,大家可以好好想想,我们时候往后探测,而这key为null的Entry实例,属于较快的探测到Entry
而这个Entry实例的key又为null,说明这个Entry可以被回收了,此时正处于占着茅坑不拉屎的位置
此时就可以把我需要复写Entry实例和这个key为null的Entry调换位置
可以使得我们需要被操作的Entry实例,在下次被操作时候,可以尽快被找到
调换了位置之后,就会执行擦除旧节点算法
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { ... for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ... if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } ... }
为什么这俩个循环都这么执着的,想改变slotToExpunge的数值呢?
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { ... int slotToExpunge = staleSlot; ... if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
明白了吧!都是为了替换方法里面的最后一段逻辑:为了判断是否需要执行擦除算法
总结
双向探测流程
在向后探测的时候,如果遇到key值对比相同的Entry,说明遇到我们需要复写value的Entry
如果在向后探测的时候,没有遇到遇到key值对比相同的Entry
来总结下
这俩个图示,大概描述了ThreadLocal进行set操作的整个流程;现在,进入下一个栏目吧,来看看ThreadLocal的get操作!
get流程,总体要比set流程简单很多,可以轻松一下了
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } protected T initialValue() { return null; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); }
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; }
整体逻辑还是很清晰了,通过while循环,不断获取Entry数组中的下一个节点,循环中有三个逻辑走向
ThreadLocal的流程,总体上比较简单
将当前ThreadLocal实例当为key,查找Entry数组当前节点(使用ThreadLocal中的魔术值算出的index)是否符合条件
不符合条件将返回null
符合条件就直接返回当前节点
这里大家一定要明确一个概念:在set的流程,发生了hash冲突,是在冲突节点向后的连续节点上,找到符合条件的节点存储,所以查询的时候,只需要在连续的节点上查找,如果碰到为null的节点,就可以直接结束查找
在set流程和get流程都使用了这个擦除旧节点的逻辑,它可以及时清除掉Entry数组中,那些key为null的Entry,如果key为null,说明这些这节点,已经没地方使用了,所以就需要清除掉
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
从上面的代码,可以发现,再进行主要的循环体,有个前置操作
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; ... }
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; ... // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
代码很少,但是实现的功能却并不少
在进行set操作的时候,会进行相关的扩容操作
public void set(T value) { ... if (map != null) map.set(this, value); else createMap(t, value); } private void set(ThreadLocal<?> key, Object value) { ... tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); }
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; }
先来看下扩容的触发条件吧
public void set(T value) { ... if (map != null) map.set(this, value); else createMap(t, value); } private void set(ThreadLocal<?> key, Object value) { ... tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); }
上面主要的代码就是:!cleanSomeSlots(i, sz) && sz >= threshold
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } private void setThreshold(int len) { threshold = len * 2 / 3; }
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
n >>>= 1:表达是无符号右移一位,正数高位补0,负数高位补1
举例:0011 ---> 0001
在上面的cleanSomeSlots方法中,只要在探测节点的时候,没有遇到Entry的key为null的节点,该方法就会返回false
private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); }
总结
满足下面俩个条件即可
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; }
从上面的逻辑,可以看出来,将旧数组的数据赋值到扩容数组,并不是全盘赋值到扩容数组的对应位置
遍历旧数组,取出其中的Entry实例
这里的扩容存储和ArrayList之类是有区别
可以发现
set,替换,擦除,扩容,基本无时无刻,都是为了使hash冲突节点,向冲突的节点靠近
这是为了提高读写节点的效率
remove方法是非常简单的,ThreadLocal拥有三个api:set、get、remove;虽然非常简单,但是还有一些必要,来稍微了解下
public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); } private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } }
逻辑非常的清晰,通过ThreadLocal实例,获取当前的index,然后从此开始查找符合条件Entry,找到后,会将其key值清掉,然后执行擦除算法
e.clear就是,弱引用的清理弱引用的方法,很简单,将弱引用referent变量置空就行了,这个变量就是持有弱引用对象的变量
文章写到这里,基本上到了尾声了,写了差不多万余字,希望大家看完后,对ThreadLocal能有个更加深入的认识
ThreadLocal的源码虽然并不多,但是其中有很多奇思妙想,有种萝卜雕花的感觉,这就是高手写的代码吗?
系列文章