下面直接来一个 NameNode 高可用的架构图:
然后解释下如何保证高可用的:
如果我们只部署了一个 NameNode,那么这个 NameNode 是有单点故障的问题的。如何解决,再加一个 NameNode 即可;
两个 NameNode 一起工作,某一个 NameNode 挂掉了,另一个 NameNode 接替工作,这件事成立的必要前提是,两个 NameNode 的数据得时时刻刻保持一致。
那么如何保持数据一致,是不是可以在两个 NameNode 之间搞个共享的文件系统?仔细想想也不行,这样的话,单点故障问题就转移到这个文件系统上了。
这里引入了 JournalNode 集群,JournalNode 的每个节点的数据都是一样的,并且时刻保持一致。并且只要超过半数的节点存活,整个 JournalNode 集群都可以正常提供服务。
所以,一般会使用奇数个节点来搭建。(为什么一般不用偶数个呢?因为 3 个节点构成的集群,可以容忍挂掉一台机器;而 4 个节点构成的集群,也只能容忍挂掉一台机器。同样是只能挂掉一台,为何不选 3 个节点的呢,还能节省资源)。
使用 JournalNode 集群,一个 NameNode 实时的往集群写,另一个 NameNode 也实时的读集群数据,这样两个 NameNode 数据就可以保持一致了。
首先不能人工参与切换。那如何实时监听呢?
首先要再引入一个关键组件:Zookeeper。当两个 NameNode 同时启动后,他们都会去 Zookeeper 上注册,谁注册成功了,谁就能获得锁,成为 Active 状态的 NameNode。
另外还需要一个组件:ZKFC,它会实时监控 Zookeeper 进程状态,并且会以心跳的方式实时的告诉 Zookeeper NameNode 的状态。如果一个 NameNode 进程挂了,就会把 Zookeeper 上的锁给另外一个 NameNode,让它成为 Active 的来工作。
NameNode 为了实现高可用,首先自己内存里的数据需要写到磁盘,然后还需要往 JournalNode 里写数据。
所以既然要写磁盘,还是往两个地方写磁盘,那必然性能会跟不上的。
所以这里 NameNode 引入了一个技术,也是本篇文章的重点:双缓冲技术。
双缓冲的设计理念如下图:
客户端不是直接写磁盘,而是往一个内存结构(Buffer1)里面写数据。当 Buffer1 达到一定阈值后,Buffer 1 和 Buffer 2 交换内存数据。此时 Buffer1 数据为空,Buffer2 开始在后台默默写磁盘。
这样的好处很明显的,前端只需要进行内存写 Buffer1 就行,性能特别高;而 Buffer2 在后台默默的同步日志到磁盘即可。
这样磁盘写,就转化成为了内存写,速度大大提高了。
然而,在真实环境不只一个客户端是这样子的:
大数据情况下是 N 个客户端同时并发写的,在高并发的情况下,我们必然要去协调多个线程动作的一致性,比如往 Buffer1 的写动作,Buffer1 与 Buffer2 数据交换的动作,Buffer2 写磁盘的动作。
那么我们该如何实现这样一个巧妙的双缓冲呢?下面的代码是我从 Hadoop 源码里抽离出来的关键实现:
package org.apache.hadoop; import java.util.LinkedList; public class FSEditLog2 { private long txid=0L; private DoubleBuffer editLogBuffer=new DoubleBuffer(); //是否正在刷写磁盘 private volatile Boolean isSyncRunning = false; private volatile Boolean isWaitSync = false; private volatile Long syncMaxTxid = 0L; //每个线程都对应自己的一个副本 private ThreadLocal<Long> localTxid=new ThreadLocal<Long>(); public void logEdit(String content){//mkdir /a synchronized (this){//加锁的目的就是为了事务ID的唯一,而且是递增 txid++; localTxid.set(txid); EditLog log = new EditLog(txid, content); editLogBuffer.write(log); } logSync(); } private void logSync(){ synchronized (this){ if(isSyncRunning){ //是否有人正在把数据同步到磁盘上面 long txid = localTxid.get(); if(txid <= syncMaxTxid){ //直接return,不接着干了? return; } if(isWaitSync){ return; } isWaitSync = true; while(isSyncRunning){ try { wait(2000); }catch (Exception e){ e.printStackTrace(); } } isWaitSync = false; } editLogBuffer.setReadyToSync(); if(editLogBuffer.syncBuffer.size() > 0) { syncMaxTxid = editLogBuffer.getSyncMaxTxid(); } isSyncRunning = true; } //释放锁 editLogBuffer.flush(); synchronized (this) { isSyncRunning = false; notify(); } //释放锁 } /** * 把日志抽象成类 */ class EditLog{ //顺序递增 long txid; //操作内容 mkdir /a String content; //构造函数 public EditLog(long txid,String content){ this.txid = txid; this.content = content; } //为了测试方便 @Override public String toString() { return "EditLog{" + "txid=" + txid + ", content='" + content + '\'' + '}'; } } /** * 双缓存方案 */ class DoubleBuffer{ //内存1 LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>(); //内存2 LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>(); /** * 把数据写到当前内存1 * @param log */ public void write(EditLog log){ currentBuffer.add(log); } /** * 交换内存 */ public void setReadyToSync(){ LinkedList<EditLog> tmp= currentBuffer; currentBuffer = syncBuffer; syncBuffer = tmp; } /** * 获取内存2里面的日志的最大的事务编号 * @return */ public Long getSyncMaxTxid(){ return syncBuffer.getLast().txid; } /** * 刷写磁盘 */ public void flush(){ for(EditLog log:syncBuffer){ //把数据写到磁盘上 System.out.println("存入磁盘日志信息:"+log); } //把内存2里面的数据要清空 syncBuffer.clear(); } } }
主要的业务逻辑就是 40 行,但是真的很巧妙。
我们先看这个 EditLog 内部类,这是对 EditLog 日志的一个封装,就两个属性 txid 和 content,分别是日志的事务id(保证唯一性)和 内容。
再看这个 DoubleBuffer 双缓冲类,很简单,就是在内存里面维护了两个有序的 LinkedList,分别是当前写编辑日志的缓冲和同步到磁盘的缓冲,其中的元素就是 EditLog 类。
write 方法就是把一条编辑日志写到当前缓冲里。
setReadyToSync 方法,就是交换两个缓冲,也是最简单的刚学习 Java 就学习过的两个变量交换值的方法。
getSyncMaxTxid 方法,获得正在同步的那个缓冲区里的最大的事务id。
flush 方法,遍历同步的缓冲的每一条编辑日志,写到磁盘,并最终清空缓冲区内容。
(1)全局的事务id
private long txid=0L;
(2)双缓冲结构
private DoubleBuffer editLogBuffer=new DoubleBuffer();
(3)控制变量
private volatile Boolean isSyncRunning = false; // 是否正在同步数据到磁盘
private volatile Boolean isWaitSync = false; // 是否有线程在等待同步数据到磁盘完成
private volatile Long syncMaxTxid = 0L; // 当前同步的最大日志事务id
private ThreadLocallocalTxid=new ThreadLocal(); // 每个线程的线程副本,用来放本线程当前写入的日志事务id
(4)主逻辑 logEdit 方法
这个方法是对外暴露的方法,客户端往双缓冲写数据就是用的这个方法。
假设当前有一个线程1 进到了 logEdit 方法,首先直接把当前类实例加锁,避免别的线程进来,以此来保证编辑日志事务id的唯一自增性。
// 全局事务递增 txid++; // 往线程本身的变量里设置事务id值 localTxid.set(txid); // 构造 EditLog 变量 EditLog log = new EditLog(txid, content); // 写入当前的 Buffer editLogBuffer.write(log);
当它执行完了这些之后,释放锁,开始执行 logSync() 方法。此时由于释放了锁,于是很多线程开始拿到锁,进入了这个方法中。
假设有五个线程进来了分别写了一条日志,于是现在双缓冲是这样子的:
好,然后线程1 开始进入 logSync 方法,第一步就是使用当前类的实例加了锁,保证只有一个线程进来。
检查 isSyncRunning 变量是否为 true,目前是 false,跳过这个方法。
开始执行这个 editLogBuffer.setReadyToSync(); 方法,于是双缓冲的数据直接被交换了。
然后获得了全局最大的id,当前是 5,赋值给了 syncMaxTxid 变量
if(editLogBuffer.syncBuffer.size() > 0) { syncMaxTxid = editLogBuffer.getSyncMaxTxid(); }
然后 isSyncRunning = true; 把这个变量置为 true,表示正在同步数据到磁盘。此时释放锁。
然后 线程 1 开始执行数据同步到磁盘的动作:editLogBuffer.flush() ,这个动作肯定耗费的时间比较久,基本是在 ms 级别。
此时我们假设 线程2 争抢到了锁,进入到了 logSync 方法。
// 线程2 判断 是否有人正在把数据同步到磁盘上面,这个值被线程 1 改为 true 了 // 进入到 if 方法 if(isSyncRunning){ // 获得到自己线程的事务id,为 2 long txid = localTxid.get(); // 2 是否小于 5 ?小于,直接返回,因为此时 5 已经正在被同步到磁盘了 if(txid <= syncMaxTxid){ return; } if(isWaitSync){ return; } isWaitSync = true; while(isSyncRunning){ try { wait(2000); }catch (Exception e){ e.printStackTrace(); } } isWaitSync = false; }
线程2 由于自身的编辑日志的事务id 小于当前正在同步的最大的事务id,所以直接返回了,然后线程3 ,线程4,线程5 进来都是这样,直接 return 返回。
假设线程6 此时进来,当前双缓冲状态是这样的
下面线程 6 干的活,参考下面代码里的注释:
// 线程6 判断是否有人正在把数据同步到磁盘上面,这个值被线程 1 改为 true 了 // 进入到 if 方法 if(isSyncRunning){ // 获得到自己线程的事务id,为 6 long txid = localTxid.get(); // 6 是否小于 5 ,不小于继续执行 if(txid <= syncMaxTxid){ return; } // 这个值为 false,继续执行 if(isWaitSync){ return; } // 把 isWaitSync 设置为 true isWaitSync = true; // 这个值被线程1置为了 true,所以这里在死循环 while(isSyncRunning){ try { // 等待 2s,wait 会释放锁,同时线程 6 进入睡眠中 wait(2000); }catch (Exception e){ e.printStackTrace(); } } isWaitSync = false; }
可以看到 线程 6 在 while 循环里无限等待数据同步到磁盘完毕。然后由于线程 6 把 isWaitSync 值改为了 true,线程 6 在等待期间释放锁,被其他线程抢到之后,其他线程由于 isWaitSync 为true,直接返回了。
当过了一会儿,线程1 把第二个 Buffer 同步到磁盘完毕后,线程1 会执行这些代码
synchronized (this) { isSyncRunning = false; notify(); } //释放锁
把 isSyncRunning 变量置为 false,同时调用 notify(),通知线程 6 ,你可以继续参与锁的竞争了。
然后线程6 ,从 wait 中醒来,重新参与锁竞争,继续执行接下来的代码。此时 isSyncRunning 已经为 false,所以它跳出了 while 循环,把 isWaitSync 置为了 false。
然后它开始执行:交换缓冲区,给最大的事务id(此时为6 )赋值,把 isSyncRunning 赋值为 true。
editLogBuffer.setReadyToSync(); if(editLogBuffer.syncBuffer.size() > 0) { syncMaxTxid = editLogBuffer.getSyncMaxTxid(); } isSyncRunning = true;
执行完了之后,释放锁,开始执行Buffer2 的同步。然后所有的线程就按照上面的方式有序的工作。
这段几十行的代码很精炼,值得反复推敲,总结下来如下:
(1)写缓冲到内存 和 同步数据到磁盘分开,互不影响和干扰;
(2)使用 synchronize ,wait 和 notify 来保证多线程有序进行工作;
(3)当在同步数据到磁盘中的时候,其他争抢到锁进来准备同步数据的线程只能等待;
(4)线程使用 ThreadLocal 变量,来记录自身当前的事务id,如果小于当前正在同步的最大事务id,则不同步;
(5)有线程在等待同步数据的时候,其他线程写完 editLog 到内存后直接返回;
本文详细探讨了 HDFS 在大数据中基石的地位,以及如何保障 NameNode 高可用的运行。
NameNode 在高可用运行时,同时是如何保证高并发读写操作的。双缓冲在其中起到了核心的作用,把写数据和同步数据到磁盘分离开,互不影响。
同时我们还剥离了一段核心双缓冲的实现代码,仔细分析了实现原理。这短短的几十行代码,可谓综合利用了多线程高并发的知识,耐人寻味。