本文首发于泊浮目的简书:https://www.jianshu.com/u/204...
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.4.8 | 文章首发 |
用过Zookeeper的同学都知道watch是一个非常好用的机制,今天我们就来看看它的实现原理。
在正文开始前,我们先来简单回忆一下watch是啥?
Zk提供了分布式数据的发布/订阅功能——这种模型非常的常见,其定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态变化时,则会通知所有订阅者。具体来说,则是Zk允许一个客户端向服务端注册一个watch监听,当服务端的一些指定事件触发了这个watch,那么就会向该客户端发送事件通知。
在剖许其实现前,我们不妨来想一想,如果自己动手实现一个watch机制,该怎么做呢?
最简单的方法是在client保存当前节点的版本,并去轮询这个节点的状态。如果发现版本变化,则client触发watch。不过比起轮询,不是有更多的好方法,不是吗?
轮询会给服务器带来不小的压力,或许我们可以考虑采用类似webhook的方式,让server保存和client约定好的地址,当watch的数据节点发生变化时,便通知client。
想到这儿,其实已经和Zk自己的watch实现有点像了。沿着这个思路,我们开始剖析:
一般我们在使用client时,getData
、getChildren
、exist
都可以用来向Zk注册watcher。其原理都是一样的,我们以exist
方法为例子进行剖析:
/** * The asynchronous version of exists. * * @see #exists(String, Watcher) */ public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ExistsWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.exists); ExistsRequest request = new ExistsRequest(); request.setPath(serverPath); request.setWatch(watcher != null); SetDataResponse response = new SetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
和watcher相关的地方有两处,一个是其被转换成WatchRegistration中的一个属性,最后变成一个Packet(client与server的最小通信单元)。
另外一处则是request.setWatch(watcher != null)
,是个布尔变量。
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null); } public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; // The synchronized block here is for two purpose: // 1. synchronize with the final cleanup() in SendThread.run() to avoid race // 2. synchronized against each packet. So if a closeSession packet is added, // later packet will be notified. synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().packetAdded(); return packet; }
这段逻辑简单来说就是拼装Packet,并将其加入发送队列。该队列由ClientCnxn中的一个SendThread消费(见SendThread.run)。该方法有较多的条件分支,且不够clean code,故在此不再贴代码,避免扰乱视听。
需要注意的是,WatchRegistration
在Packet发送前并不会被序列化发送过去,避免发送不必要的信息,毕竟已经在request中标记为watch了。那么这个WatchRegistration
有什么用呢?
在Zk的client中,会维护发送队列和等待回复的队列,里面都是一个个Packet。
/** * These are the packets that have been sent and are waiting for a response. */ private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); /** * These are the packets that need to be sent. */ private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
接下来,我们查看SendThread.readReponse
:
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); eventThread.queueEventOfDeath(); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (tunnelAuthInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { finishPacket(packet); } }
synchronized (pendingQueue)
中,我们可以看到从队列中拿出了Packet,并最后将其丢入了finishPacket
。
protected void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { p.watchRegistration.register(err); } // Add all the removed watch events to the event queue, so that the // clients will be notified with 'Data/Child WatchRemoved' event type. if (p.watchDeregistration != null) { Map<EventType, Set<Watcher>> materializedWatchers = null; try { materializedWatchers = p.watchDeregistration.unregister(err); for (Entry<EventType, Set<Watcher>> entry : materializedWatchers .entrySet()) { Set<Watcher> watchers = entry.getValue(); if (watchers.size() > 0) { queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey()); // ignore connectionloss when removing from local // session p.replyHeader.setErr(Code.OK.intValue()); } } } catch (KeeperException.NoWatcherException nwe) { p.replyHeader.setErr(nwe.code().intValue()); } catch (KeeperException ke) { p.replyHeader.setErr(ke.code().intValue()); } } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
可以确定的是,WatchDeregistration
仅仅维护在client不会发送到server去。register的逻辑很简单,我们来看一下:
/** * Register the watcher with the set of watches on path. * @param rc the result code of the operation that attempted to * add the watch on the path. */ public void register(int rc) { if (shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = getWatches(rc); synchronized(watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); } } }
我们可以看到,维护的就是一个path-watchers的字典。接下来来看queueEvent
:
void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) { KeeperState sessionState = KeeperState.SyncConnected; if (KeeperException.Code.SESSIONEXPIRED.intValue() == err || KeeperException.Code.CONNECTIONLOSS.intValue() == err) { sessionState = Event.KeeperState.Disconnected; } WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath); eventThread.queueEvent(event, materializedWatchers); }
逻辑很简单,组装event,交给eventThread去做通知。
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<Watcher>(); watchers.addAll(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
先看一下ClientWatchManager.materialize
。
/* (non-Javadoc) * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, * Event.EventType, java.lang.String) */ @Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: result.add(defaultWatcher); boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } } synchronized(existWatches) { for(Set<Watcher> ws: existWatches.values()) { result.addAll(ws); } if (clear) { existWatches.clear(); } } synchronized(childWatches) { for(Set<Watcher> ws: childWatches.values()) { result.addAll(ws); } if (clear) { childWatches.clear(); } } return result; case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; case NodeChildrenChanged: synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } break; case NodeDeleted: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } // XXX This shouldn't be needed, but just in case synchronized (existWatches) { Set<Watcher> list = existWatches.remove(clientPath); if (list != null) { addTo(list, result); LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); } } synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } break; default: String msg = "Unhandled watch event type " + type + " with state " + state + " on path " + clientPath; LOG.error(msg); throw new RuntimeException(msg); } return result; } }
注意,当watch被触发后,即会被移除。client中的ZkWatchManager
是维护了watch状态的:
static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); //...... }
再说回来eventThread.run
最后做的事情——又是一个入队列。那么我们来看看这个线程的核心方法:
@Override @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); }
接着看processEvent:
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else if (event instanceof LocalCallback) { //在本文中这些逻辑不重要,skip }
当process被调用后,我们自己编写的逻辑就会被触发啦。
在上文,我们了解了client的watch相关实现,接下来,我们就来捋一捋服务端的watch实现。
那么我们先来看ZkServer handle request的地方,FinalRequestProcessor
的processRequest
:
case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); String path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } Stat stat = zks.getZKDatabase().statNode(path, existsRequest .getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); break; }
可以看到,如果request是要求watch的,那么会将ServerCnxn传递下去——其本质代表了客户端和服务器之间的连接。这样当数据事件发生时,可以通过连接触发client的watch。
跳转DataTreed.statNode:
public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException { Stat stat = new Stat(); DataNode n = nodes.get(path); if (watcher != null) { dataWatches.addWatch(path, watcher); } if (n == null) { throw new KeeperException.NoNodeException(); } synchronized (n) { n.copyStat(stat); return stat; } }
当watcher != null
时,则会添加一个watcher当服务端的dataWatches
中。接下来,我们来看一下服务端的watch核心类——WatchManager
:
/** * This class manages watches. It allows watches to be associated with a string * and removes watchers and their watches in addition to managing triggers. */ class WatchManager { private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class); private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>(); synchronized int size(){ int result = 0; for(Set<Watcher> watches : watchTable.values()) { result += watches.size(); } return result; } synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); } synchronized void removeWatcher(Watcher watcher) { HashSet<String> paths = watch2Paths.remove(watcher); if (paths == null) { return; } for (String p : paths) { HashSet<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); if (list.size() == 0) { watchTable.remove(p); } } } } Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null); } Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; } /** * Brief description of this object. */ @Override public synchronized String toString() { StringBuilder sb = new StringBuilder(); sb.append(watch2Paths.size()).append(" connections watching ") .append(watchTable.size()).append(" paths\n"); int total = 0; for (HashSet<String> paths : watch2Paths.values()) { total += paths.size(); } sb.append("Total watches:").append(total); return sb.toString(); } /** * String representation of watches. Warning, may be large! * @param byPath iff true output watches by paths, otw output * watches by connection * @return string representation of watches */ synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) { if (byPath) { for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) { pwriter.println(e.getKey()); for (Watcher w : e.getValue()) { pwriter.print("\t0x"); pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId())); pwriter.print("\n"); } } } else { for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) { pwriter.print("0x"); pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId())); for (String path : e.getValue()) { pwriter.print("\t"); pwriter.println(path); } } } } /** * Checks the specified watcher exists for the given path * * @param path * znode path * @param watcher * watcher object reference * @return true if the watcher exists, false otherwise */ synchronized boolean containsWatcher(String path, Watcher watcher) { HashSet<String> paths = watch2Paths.get(watcher); if (paths == null || !paths.contains(path)) { return false; } return true; } /** * Removes the specified watcher for the given path * * @param path * znode path * @param watcher * watcher object reference * @return true if the watcher successfully removed, false otherwise */ synchronized boolean removeWatcher(String path, Watcher watcher) { HashSet<String> paths = watch2Paths.get(watcher); if (paths == null || !paths.remove(path)) { return false; } HashSet<Watcher> list = watchTable.get(path); if (list == null || !list.remove(watcher)) { return false; } if (list.size() == 0) { watchTable.remove(path); } return true; } /** * Returns a watch report. * * @return watch report * @see WatchesReport */ synchronized WatchesReport getWatches() { Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>(); for (Entry<Watcher, HashSet<String>> e: watch2Paths.entrySet()) { Long id = ((ServerCnxn) e.getKey()).getSessionId(); HashSet<String> paths = new HashSet<String>(e.getValue()); id2paths.put(id, paths); } return new WatchesReport(id2paths); } /** * Returns a watch report by path. * * @return watch report * @see WatchesPathReport */ synchronized WatchesPathReport getWatchesByPath() { Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>(); for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) { Set<Long> ids = new HashSet<Long>(e.getValue().size()); path2ids.put(e.getKey(), ids); for (Watcher watcher : e.getValue()) { ids.add(((ServerCnxn) watcher).getSessionId()); } } return new WatchesPathReport(path2ids); } /** * Returns a watch summary. * * @return watch summary * @see WatchesSummary */ synchronized WatchesSummary getWatchesSummary() { int totalWatches = 0; for (HashSet<String> paths : watch2Paths.values()) { totalWatches += paths.size(); } return new WatchesSummary (watch2Paths.size(), watchTable.size(), totalWatches); } }
整个类非常好理解,先看两个核心成员变量:
addWatch就是往两个map中添加数据,而触发便是根据path遍历出那些watcher,并调用它们的process——这时ServerCnxn就会发送一个Packet到client。
那么什么时候触发呢?也很简单。就在DataTree的代码里,对相应数据进行操作时,就会触发watcher。我们以DataTree.setData
为例:
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix = getMaxPrefixWithQuota(path); if(lastPrefix != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
至此,我们就理清watch在Zk里到底是怎么一回事了。这么一通分析下来,我们可以得出watcher的几个特性:
轻量:client的request是否要watch其实仅仅通过一个boolean来决定,同样的,server的response的watch回调——WatchedEvent也仅仅只有三个属性:
这种轻量化的设计使得网络开销和服务端内存开销上都是很廉价的。
在本文中,我们一起了解了watch的实现机理。简单总结如下: