客户端创建 Socket 连接后,会尝试与服务器连接,如果连接成功,则与服务器之间形成Session。
在ZooKeeper中,客户端和服务端之间的会话是怎样创建、检查更新、删除以及被如何维护的,将再接下来的文章中进行介绍和分析。
ZooKeeper 在 SessionTrackerImpl 中实现了Session的各种操作:创建session,检查更新session、删除session等。而本篇文章将对该类进行简单概述。
SessionTrackerImpl 主要是来维护客户端和服务器之间的session。SessionTrackerImpl实现了SessionTracker,同时也继承了ZooKeeperCriticalThread。即SessionTrackerImpl也是一个线程。
/** *按记号间隔分组跟踪会话。 *总是将tick的间隔取整,以提供某种宽限期。 *因此,会话将在由在给定时间间隔内过期的会话组成的批中过期。 */ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {}
在服务端通过 SessionTrackerImpl 和 ExpiryQueue 来保存Session会话信息。
SessionTrackerImpl属性:
// 存储着会话id protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>(); // 队列存储失效的会话 private final ExpiryQueue<SessionImpl> sessionExpiryQueue; // 存储超时会话 private final ConcurrentMap<Long, Integer> sessionsWithTimeout; // 下一个会话id private final AtomicLong nextSessionId = new AtomicLong(); // 失效的会话 private final SessionExpirer expirer;
ExpiryQueue属性:
// E即是session实例对象,long为失效时间 private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>(); // 当前失效时间的session集合 private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>(); // 下一次失效时间 private final AtomicLong nextExpirationTime = new AtomicLong(); // 失效时间间隔 private final int expirationInterval;
SessionTrackerImpl的run()与之前文章NIOServerCnxnFactory源码分析中ConnectionExpirerThread内的run()类似。
public void run() { try { while (running) { long waitTime = sessionExpiryQueue.getWaitTime();//等待时间 if (waitTime > 0) {// 如果等待时间大于0 Thread.sleep(waitTime);// 当前线程睡眠,等待时间到下一次失效时间 continue; } for (SessionImpl s : sessionExpiryQueue.poll()) {// 下次失效时间对应的session关闭 ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1); setSessionClosing(s.sessionId); expirer.expire(s);// 进行closeSession操作 } } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); }
初始化下一次会话的ID,以便客户端发起连接请求时能够获取不冲突且正确的会话ID。
/** * 生成初始会话ID. 高阶1字节是serverId * 下5个字节来自timestamp ,低阶2字节是0s。 * 使用“>>>8”,而不是“>>8”,以确保高位1字节完全由服务器Id决定 * @param id server Id * @return the Session Id */ public static long initializeNextSessionId(long id) { long nextSid; nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 生成下一次会话ID nextSid = nextSid | (id << 56); if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // 这是一个不太可能的边缘情况 } return nextSid; }
touchSession是每次接收到客户端的消息之后就进行的操作,其主要是延长对应的session的超时的时间.
public synchronized boolean touchSession(long sessionId, int timeout) { SessionImpl s = sessionsById.get(sessionId); if (s == null) {//对应session不存在 logTraceTouchInvalidSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session return false; } if (s.isClosing()) {//对应session已经被标识为关闭 logTraceTouchClosingSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session return false; } //更新会话失效时间 updateSessionExpiry(s, timeout); return true; }
// 确定session的timeout和id public long createSession(int sessionTimeout) { long sessionId = nextSessionId.getAndIncrement(); trackSession(sessionId, sessionTimeout); return sessionId; } // 删除会话 public synchronized void removeSession(long sessionId) { LOG.debug("Removing session 0x{}", Long.toHexString(sessionId)); // 删除会话ID SessionImpl s = sessionsById.remove(sessionId); // 删除会话timeout sessionsWithTimeout.remove(sessionId); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Removing session 0x" + Long.toHexString(sessionId)); } if (s != null) {// 在会话失效队列中删除 sessionExpiryQueue.remove(s); } }
本章对会话的维护进行了简单分析,下一章将对会话的创建进行细节分析。