从图上可知,StatisticNode 的调用栈的顺序,和几个类的关系,今天我们开始分析这几个类的结构和功能点
Node, 用来统计的接口, 其方法大部分都是数据汇总类
/** * Get incoming request per minute ({@code pass + block}). * * @return total request count per minute */ long totalRequest(); /** * Get pass count per minute. * * @return total passed request count per minute * @since 1.5.0 */ long totalPass(); /** * Get {@link Entry#exit()} count per minute. * * @return total completed request count per minute */ long totalSuccess(); /** * Get blocked request count per minute (totalBlockRequest). * * @return total blocked request count per minute */ long blockRequest(); /** * Get exception count per minute. * * @return total business exception count per minute */ long totalException(); /** * Get pass request per second. * * @return QPS of passed requests */ double passQps(); /** * Get block request per second. * * @return QPS of blocked requests */ double blockQps(); /** * Get {@link #passQps()} + {@link #blockQps()} request per second. * * @return QPS of passed and blocked requests */ double totalQps(); // 还有很多 ...
/** 秒级的滑动时间窗口(时间窗口单位500ms), 1000ms / 2 , sampleCode 为 2 * Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans * by given {@code sampleCount}. */ private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); /** 最近1分钟内的统计记录 , 最小时间单位 1s * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds, * meaning each bucket per second, in this way we can get accurate statistics of each second. */ private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); /** * The counter for thread count. 访问线程计数 */ private LongAdder curThreadNum = new LongAdder(); /** * The last timestamp when metrics were fetched. 最近一次统计时间 */ private long lastFetchTime = -1; @Override public Map<Long, MetricNode> metrics() { // The fetch operation is thread-safe under a single-thread scheduler pool. long currentTime = TimeUtil.currentTimeMillis(); // 当前时间戳 currentTime = currentTime - currentTime % 1000; Map<Long, MetricNode> metrics = new ConcurrentHashMap<>(); // 从每秒的滑动窗口中获取指标list List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details(); long newLastFetchTime = lastFetchTime; // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date). // 滚动更新各个统计Node for (MetricNode node : nodesOfEverySecond) { if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) { metrics.put(node.getTimestamp(), node); newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp()); } } // 更新最新的时间 lastFetchTime = newLastFetchTime; // 返回当前统计实体 return metrics; } // 后面都是接口的实现类,其计算逻辑都是依托于上面的2个滑动窗口计算而来
public class DefaultNode extends StatisticNode { /** * 一个resource 同一个context 下绑定一个 defaultNode */ private ResourceWrapper id; /** * Node下对应的执行子节点 */ private volatile Set<Node> childList = new HashSet<>(); /** * 如果该值为空, 会在ClusterBuilderSlot.entry中会进行初始化。 */ private ClusterNode clusterNode; }
public class ArrayMetric implements Metric { // 指标桶 数组 private final LeapArray<MetricBucket> data; //举个例子 @Override public long maxSuccess() { // 返回当前时间戳的桶 data.currentWindow(); long success = 0; // 循环当前所有桶内的数据 List<MetricBucket> list = data.values(); for (MetricBucket window : list) { // 桶内的成功数据 拿到最大值 //。桶内的数据分步有个枚举类型 com.alibaba.csp.sentinel.slots.statistic.MetricEvent if (window.success() > success) { success = window.success(); } } return Math.max(success, 1); } // 后续全部都是在这个array中进行处理 ... }
这里面比较重要的2个方法
根据提供的时间戳获取桶项
判断桶内的成功数据
可以看看在代码中是如何实现的
根据提供的时间戳获取桶项
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根据当前时间戳 , 除以当前设置的时间窗口单位 再用长度取模运算 得到的一个桶id // int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // 计算当前滑动窗口的开始位置, 用时间戳 % 窗口时间 算出当前窗口的时间长度 , 再时间戳减去当前长度, 得到开始的位置 long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { // 看有没有初始化过该时间戳窗口 WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update *在滑动窗口最开始的地方创建桶,cas操作保证只有1个能更新成功. * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ //先初始化一个当前窗口单位,开始时间为滑动窗口开始时间 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); // cas 更新到当前array中 if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // 资源锁竞争失败 再进行下一轮执行的线程竞争,再继续往下走 // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { // 如果老的桶的开始时间等于当前的开始时间, 说明就是最新的桶了, 直接返回就行 /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { // 如果窗口的开始时间是大于老桶的开始时间, 说明老的桶的数据需要重置了 /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ // 使用同步锁(默认非公平锁),只有在过期的情况下才进去拿更新锁 大部分情况下不会有性能损耗 if (updateLock.tryLock()) { try { // 重置,分钟的时间窗口和秒级别的时间窗口是不一样的 // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // 不可能到这里,直接返回新的就行 // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
获取当前滑动窗口的所有成功的数据,本质上就是将滑动窗口内所有的时间窗口单位全部累加,调用链如下所示
/** 这个枚举就决定了MetricBucket 里面的longAdder的数组大小 */ public enum MetricEvent { /** * Normal pass. */ PASS, /** * Normal block. */ BLOCK, EXCEPTION, SUCCESS, RT, /** * Passed in future quota (pre-occupied, since 1.5.0). */ OCCUPIED_PASS }