前段时间爆改Codis的Java客户端Jodis,它的测试类中用到了指数退避算法。这是大学计算机网络课程会讲到的知识,本文权当复习,并且看看它的思想是如何应用在大数据组件中的。
所谓指数退避(exponential backoff),是一种根据系统反馈来成倍地削减操作的速率(比如数据流的速率)的算法,直到系统可以稳定地进行处理为止。在计算机网络的世界里,它一般用来控制数据帧/包的重传,避免密集的冲突与网络拥塞。
以以太网中使用的数据链路层协议CSMA/CD(载波监听多路访问/冲突检测)为例,其处理冲突的方式就是截断二进制指数退避(truncated binary exponential backoff),具体逻辑如下:
确定退避时间的初始值。一般是用端到端的往返时间2τ,该时间也称为冲突窗口(collision window)或争用期,以太网习惯取值51.2μs。
冲突发生时,设冲突次数为c,定K=min(c, 10)。从集合[0, 1, 2, 3, ..., 2K - 1]中随机取一个整数k,等待冲突窗口时长的k倍,然后再尝试重新发送帧。
当c > 16时,认定此帧发送失败,向高层报告错误。
可见,该方法名为“二进制”是因为冲突窗口倍数的可取值有2K个,名为“截断”是因为最多重试16次就失败,不会无限重试下去。随着重试次数增多,退避时间的期望值也就越大,从而在竞争激烈时减少碰撞发生的概率。
下图是CSMA/CD的流程图,蓝框中就是指数退避流程。
指数退避的思想非常简单而有效,在除网络之外的其他方面也有应用。作为大数据工程师,挑两个大数据组件稍微讲解一下吧。
Flume是一个高效的日志数据采集与聚合框架,它由数据源Source、数据通道Channel、数据汇集Sink三大部分组成。其中,数据源有一个经典且常用的实现SpoolDirectorySource,它负责读取特定目录下的日志文件,其中用到了指数退避算法。它的主要逻辑在SpoolDirectoryRunnable这个线程中,下面来看其run()方法。(Flume版本为我们在用的1.7.0)
@Override public void run() { int backoffInterval = 250; try { while (!Thread.interrupted()) { List<Event> events = reader.readEvents(batchSize); if (events.isEmpty()) { break; } sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); try { getChannelProcessor().processEventBatch(events); reader.commit(); } catch (ChannelFullException ex) { logger.warn("The channel is full, and cannot write data now. The " + "source will try again after " + backoffInterval + " milliseconds"); hitChannelFullException = true; backoffInterval = waitAndGetNewBackoffInterval(backoffInterval); continue; } catch (ChannelException ex) { logger.warn("The channel threw an exception, and cannot write data now. The " + "source will try again after " + backoffInterval + " milliseconds"); hitChannelException = true; backoffInterval = waitAndGetNewBackoffInterval(backoffInterval); continue; } backoffInterval = 250; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); } } catch (Throwable t) { logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " + "Uncaught exception in SpoolDirectorySource thread. " + "Restart or reconfigure Flume to continue processing.", t); hasFatalError = true; Throwables.propagate(t); } } private int waitAndGetNewBackoffInterval(int backoffInterval) throws InterruptedException { if (backoff) { TimeUnit.MILLISECONDS.sleep(backoffInterval); backoffInterval = backoffInterval << 1; backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : backoffInterval; } return backoffInterval; }
该方法先通过ReliableSpoolingFileEventReader.readEvents()方法获取事件,再调用ChannelProcessor.processEventBatch()方法将事件批次放入对应的Channel中并提交。如果Channel已满或者写入发生异常,就以250ms为起始值进行退避,每次退避后等待时长都会翻倍,直到变量maxBackoff设定的最大值(默认为4000ms)。一旦提交成功,等待时长会重设回250ms,多次提交不成功的话也不会截断。
可见,Flume的指数退避方法比CSMA/CD的方法来得更加简单直接。
本来想用ZK客户端Curator举例子的,但是它比较默默无闻,还是用Hadoop吧。
hadoop-common项目里的RetryPolicies类中提供了非常多种重试策略,其中就有指数退避。
public static final RetryPolicy exponentialBackoffRetry( int maxRetries, long sleepTime, TimeUnit timeUnit){ return new ExponentialBackoffRetry(maxRetries, sleepTime, timeUnit); } static class ExponentialBackoffRetry extends RetryLimited { public ExponentialBackoffRetry( int maxRetries, long sleepTime, TimeUnit timeUnit){ super(maxRetries, sleepTime, timeUnit); if (maxRetries < 0) { throw new IllegalArgumentException("maxRetries = " + maxRetries + " < 0"); } else if (maxRetries >= Long.SIZE - 1) { throw new IllegalArgumentException("maxRetries = " + maxRetries + " >= " + (Long.SIZE - 1)); } } @Override protected long calculateSleepTime(int retries) { return calculateExponentialTime(sleepTime, retries + 1); } }
可见,ExponentialBackoffRetry类强制规定了最大重试次数maxRetries,初始等待时间为sleepTime,实际等待时间则由calculateExponentialTime()方法来计算。
private static long calculateExponentialTime(long time, int retries, long cap){ long baseTime = Math.min(time * (1L << retries), cap); return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5)); } private static long calculateExponentialTime(long time, int retries) { return calculateExponentialTime(time, retries, Long.MAX_VALUE); }
该方法使用cap参数来限制等待时间的最大值,默认是不限制的。除了在初始时间的基础上乘2的重试次数次幂之外,还会用0.5~1.5区间内的随机数加权,比较“聪明”一点。
— THE END —
◤半年文章精选系列◥
Flink从入门到放弃之源码解析系列
《Flink组件和逻辑计划》
《Flink执行计划生成》
《JobManager中的基本组件(1)》
《JobManager中的基本组件(2)》
《JobManager中的基本组件(3)》
《TaskManager》
《算子》
《网络》
《水印WaterMark》
《CheckPoint》
《任务调度及负载均衡》
《异常处理》
大数据成神之路-基础篇
《HashSet》
《HashMap》
《LinkedList》
《ArrayList/Vector》
《ConcurrentSkipListMap》
《ConcurrentHashMap1.7》
《ConcurrentHashMap1.8 Part1》
《ConcurrentHashMap1.8 Part2》
《CopyOnWriteArrayList》
《CopyOnWriteArraySet》
《ConcurrentLinkedQueue》
《LinkedBlockingDeque》
《LinkedBlockingQueue》
《ArrayBlockingQueue》
《ConcurrentSkipListSet》
大数据成神之路-进阶篇
《JVM&NIO基础入门》
《分布式理论基础和原理》
《分布式中的常见问题解决方案(分布式锁/事务/ID)》
《Zookeeper》
《RPC》
《Netty入门篇》
《Netty源码篇》
《Linux基础》
Flink入门系列
《Flink入门》
《Flink DataSet&DataSteam API》
《Flink集群部署》
《Flink重启策略》
《Flink分布式缓存》
《Flink广播变量》
《Flink中的Time》
《Flink中的窗口》
《时间戳和水印》
《Broadcast广播变量》
《Flink-Kafka-Connector》
《Flink之Table-&-SQL》
《Flink实战项目之实时热销排行》
《Flink-Redis-Sink》
《Flink消费Kafka写入Mysql》
Flink高级进阶
《FaultTolerance》
《流表对偶(duality)性》
《持续查询(ContinuousQueries)》
《DataStream-Connectors之Kafka》
《SQL概览》
《JOIN 算子》
《TableAPI》
《JOIN-LATERAL》
《JOIN-LATERAL-Time Interval(Time-windowed)》
《Temporal-Table-JOIN》
《State》
《FlinkSQL中的回退更新-Retraction》
《Apache Flink结合Apache Kafka实现端到端的一致性语义》
《Flink1.8.0发布!新功能抢先看》
《Flink1.8.0重大更新-Flink中State的自动清除详解》
《Flink在滴滴出行的应用与实践》
《批流统一计算引擎的动力源泉—Flink Shuffle机制的重构与优化》
《HBase分享 | Flink+HBase场景化解决方案》
《腾讯基于Flink的实时流计算平台演进之路》
《Flink进阶-Flink CEP(复杂事件处理)》
《Flink基于EventTime和WaterMark处理乱序事件和晚到的数据》
《Flink 最锋利的武器:Flink SQL 入门和实战》
《Flink Back Pressure》
《使用Flink读取Kafka中的消息》
《Flink on YARN部署快速入门指南》
《Apache Flink状态管理和容错机制介绍》
Hadoop生态圈系列
《Hadoop极简入门》
《MapReduce编程模型和计算框架架构原理》
《分布式文件系统-HDFS》
《YARN》
《Hadoop机架感知》
《HDFS的一个重要知识点-HDFS的数据流》
《Hadoop分布式缓存(DistributedCache)》
《如何从根源上解决 HDFS 小文件问题》(https://dwz.cn/FqDPpRUc)
《Hadoop解决小文件存储思路》(https://dwz.cn/2oCdmCkw)
《Hadoop所支持的几种压缩格式》
《MapReduce Join》
《YARN Capacity Scheduler(容量调度器)》
《hadoop上搭建hive》
《基于Hadoop的数据仓库Hive基础知识》
《Hive使用必知必会系列》
《一个小知识点-Hive行转列实现Pivot》
《面试必备技能-HiveSQL优化》
《HBase和Hive的区别和各自适用的场景》
《一篇文章入门Hbase》
《敲黑板:HBase的RowKey设计》
《HBase读写优化》
《HBase在滴滴出行的应用场景和最佳实践》
《Phoenix=HBase+SQL,让HBase插上了翅膀》
《一个知识点将你拒之门外之Hbase的二级索引》(https://dwz.cn/umfBOZ5l)
《Phoenix重磅 | Phoenix核心功能原理及应用场景介绍》
《DB、DW、DM、ODS、OLAP、OLTP和BI的概念理解》
《Hive/HiveSQL常用优化方法全面总结》
实时计算系列(spark、kafka等)
《Spark Streaming消费Kafka数据的两种方案》
《Apache Kafka简单入门》
《你不得不知道的知识-零拷贝》
《Kafka在字节跳动的实践和灾备方案》
《万字长文干货 | Kafka 事务性之幂等性实现》
《Kafka最佳实践》
《Kafka Exactly-Once 之事务性实现》
《Kafka连接器深度解读之错误处理和死信队列》
《Spark之数据倾斜调优》
《Structured Streaming 实现思路与实现概述》
《Spark内存调优》
《广告点击数实时统计:Spark StructuredStreaming + Redis Streams》
《Spark Shuffle在网易的优化》
《SparkSQL极简入门》
《下一代分布式消息队列Apache Pulsar》
《Pulsar与Kafka消费模型对比》
《Spark SQL重点知识总结》
《Structured Streaming 之状态存储解析》
《周期性清除Spark Streaming流状态的方法》
《Spark Structured Streaming特性介绍》
《Spark Streaming 反压(Back Pressure)机制介绍》
《Spark 从 Kafka 读数设置子并发度问题》
规范和系统设计
《阿里云10 PB+/天的日志系统设计和实现》
《阿里云Redis开发规范》
《Java中多个ifelse语句的替代设计》
《面试系列:十个海量数据处理方法大总结》
杂谈
《作为面试官的一点点感悟,谈谈技术人的成长之路》
《成年人的世界没有容易二字》
《我最近在关注的事》
《真香》
《简单说说学习这件事》
《20多岁做什么,将来才不会后悔》
《2019-05-12最近的总结》
《我军新闻联播气势+9999》
《周末分享 | 高手的战略》
《周末分享 | 快速定位自己的缺点》
《周末分享 | 我见过最高级的聪明是靠谱》