在使用Flume时,有时遇到如下错误信息:Space for commit to queue couldn't be acquired。 究其原因,是在memory channel的使用中出现了问题。 本文就以此为切入点,带大家一起剖析下 Flume 中 MemoryChannel 的实现
目录
在使用Flume时,有时遇到如下错误信息:Space for commit to queue couldn't be acquired。
究其原因,是在memory channel的使用中出现了问题。
本文就以此为切入点,带大家一起剖析下 Flume 中 MemoryChannel 的实现
Flume的用途:高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
这里我们介绍与本文相关的特点:
这里就要介绍channel的概念。channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的source和sink链接。
支持的类型主要有: JDBC channel , File System channel , Memory channel等,大致区别如下:
本文主要涉及Memory Channel,所以看看其特性。
由此,我们可以总结出来 Flume 的一些重点功能:
因为MemoryChannel属于Flume的重要模块,所以,我们本文就看看是MemoryChannel是如何确保Flume以上特点的,这也是本文的学习思路。
如何回滚,使用锁,信号量 ,动态扩容,如何解决生产者消费者不一致问题。
MemoryChannel还是比较简单的,主要是通过MemoryTransaction中的putList、takeList与MemoryChannel中的queue进行数据流转和事务控制,这里的queue相当于持久化层,只不过放到了内存中,如果是FileChannel的话,会把这个queue放到本地文件中。
MemoryChannel受内存空间的影响,如果数据产生的过快,同时获取信号量超时容易造成数据的丢失。而且Flume进程挂掉,数据也会丢失。
具体是:
下面表示了Event在一个使用了MemoryChannel的agent中数据流向:
source ---> putList ---> queue ---> takeList ---> sink
为了大家更好的理解,我们提前把最终图例发到这里。
具体如下图:
+----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+
手机上如图:
我们要看看MemoryChannel重要变量的定义,这里我们没有按照代码顺序来,而是重新整理。
MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三个接口。
Channel接口 主要声明了Channel中的三个方法,就是队列基本功能:
public void put(Event event) throws ChannelException; //从指定的Source中获得Event放入指定的Channel中 public Event take() throws ChannelException; //从Channel中取出event放入Sink中 public Transaction getTransaction(); //获得当前Channel的事务实例
Transaction接口 主要声明了flume中事务机制的四个方法,就是事务功能:
enum TransactionState { Started, Committed, RolledBack, Closed } //枚举类型,指定了事务的四种状态,事务开始、提交、失败回滚、关闭 void begin(); void commit(); void rollback(); void close();
Configurable接口 主要是和flume配置组件相关的,需要从flume配置系统获取配置信息的任何组件,都必须实现该接口。该接口中只声明了一个context方法,用于获取配置信息。
大体逻辑如下:
+-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +-----------+ +--------------+ +---------------+ ^ ^ ^ | | | | | | | | | | +-------------+--------------+ | | | | | | | MemorChannel +---------+ +-------+ | | | | | | | | | | | | | | +----------------------------+
下面我们具体讲讲成员变量。
首先是一系列业务配置参数。
//定义队列中一次允许的事件总数 private static final Integer defaultCapacity = 100; //定义一个事务中允许的事件总数 private static final Integer defaultTransCapacity = 100; //将物理内存转换成槽(slot)数,默认是100 private static final double byteCapacitySlotSize = 100; //定义队列中事件所使用空间的最大字节数(默认是JVM最大可用内存的0.8) private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80); //定义byteCapacity和预估Event大小之间的缓冲区百分比: private static final Integer defaultByteCapacityBufferPercentage = 20; //添加或者删除一个event的超时时间,单位秒: private static final Integer defaultKeepAlive = 3; // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; private volatile int byteCapacity; private volatile int lastByteCapacity; private volatile int byteCapacityBufferPercentage; private ChannelCounter channelCounter;
这些参数基本都在configure(Context context)中设置,基本逻辑如下:
设置 capacity:MemroyChannel的容量,默认是100。
设置 transCapacity:每个事务最大的容量,也就是每个事务能够获取的最大Event数量。默认也是100。事务容量必须小于等于Channel Queue容量。
设置 byteCapacityBufferPercentage:用来确定byteCapacity的一个百分比参数,即我们定义的字节容量和实际事件容量的百分比,因为我们定义的字节容量主要考虑Event body,而忽略Event header,因此需要减去Event header部分的内存占用,可以认为该参数定义了Event header占了实际字节容量的百分比,默认20%;
设置 byteCapacity:byteCapacity等于设置的byteCapacity值或堆的80%乘以1减去byteCapacityBufferPercentage的百分比,然后除以100。具体是首先读取配置文件定义的byteCapacity,如果没有定义,则使用默认defaultByteCapacity,而defaultByteCapacity默认是JVM物理内存的80%(Runtime.getRuntime().maxMemory() * .80);那么实际byteCapacity=定义的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默认100,即计算百分比的一个系数。
设置 keep-alive:增加和删除一个Event的超时时间(单位:秒)。
设置初始化 LinkedBlockingDeque对象,大小为capacity。以及各种信号量对象。
最后初始化计数器。
配置代码摘要如下:
public void configure(Context context) { capacity = context.getInteger("capacity", defaultCapacity); transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize); if (byteCapacity < 1) { byteCapacity = Integer.MAX_VALUE; } keepAlive = context.getInteger("keep-alive", defaultKeepAlive); resizeQueue(capacity); if (channelCounter == null) { channelCounter = new ChannelCounter(getName()); } }
ChannelCounter 需要单独说一下。其就是把channel的一些属性封装了一下,初始化了一个ChannelCounter,是一个计数器,记录如当前队列放入Event数、取出Event数、成功数等。
private ChannelCounter channelCounter;
定义如下:
public class ChannelCounter extends MonitoredCounterGroup implements ChannelCounterMBean { private static final String COUNTER_CHANNEL_SIZE = "channel.current.size"; private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt"; private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt"; private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success"; private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity"; }
其次是Semaphore和Queue。主要就是用来协助制事务。
MemoryChannel有三个信号量用来控制事务,防止容量越界:queueStored,queueRemaining,bytesRemaining。
private Object queueLock = new Object(); @GuardedBy(value = "queueLock") private LinkedBlockingDequequeue; private Semaphore queueRemaining; private Semaphore queueStored; private Semaphore bytesRemaining;// 表示可以使用的内存大小。该大小就是计算后的byteCapacity值。
内部类MemoryTransaction
是整个事务保证最重要的类。
MemoryTransaction用来接收数据和事务控制。该类继承BasicTransactionSemantics类。
MemoryTransaction维护了两个队列,一个用于Source的put,一个用于Sink的take,容量大小为事务的容量(transCapacity)。
private class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDequetakeList; private LinkedBlockingDequeputList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; }
无论是Sink,还是Source都会调用getTransaction()方法,获取当前Channel的事务实例。
接口与成员变量大致逻辑可以理解如下,其中 Channel 的 API 表示这里是 MemorChannel 的对外 API:
+-----------+ +--------------+ +---------------+ | | | | | | | Channel | | Transaction | | Configurable | | | | | | | +---+-------+ +--------------+ +---------------+ ^ | ^ ^ | | | | | | | +--------------------------------------------------------+ | | | | | | | | MemoryChannel | | | | | + | | | | | | | | MemoryTransaction | | | | | | | | Semaphore / Queue | | | | | | +--------+ | | API | | | | | | | Config Parameters +------------+ | | | | +--------------------------------------------------------+
看了上面讲的,估计大家还是会晕,因为成员变量和概念实在是太多了,所以我们从使用入手分析。
前面提到,memory channel内部有三个队列,分别是putList,queue,takeList。其中putList,takeList在MemoryTransaction之中。
channel之上有一把锁,当source主动向channel放数据或者sink主动从channel取数据时,会抢锁,谁取到锁,谁就可以操作channel。
每次使用时会首先调用tx.begin()开始事务,也就是获取锁。然后调用tx.commit()提交数据或者调用tx.rollback()取消操作。
这里需要注意的是:Source, Sink 都是死循环,抢同一个锁。所以就会有消费者,生产者速度不一致的情况,所以就需要有 一个内部的 buffer,就是我们的Queue。
这是一个死循环,source一直试图获取channel锁,然后从kafka获取数据,放入channel中,那每次放入多少个数据呢?在KafkaSource.java中,代码是这样的:
while (eventList.size() < batchUpperLimit && System.currentTimeMillis() < maxBatchEndTime) { }
含义就是:每次最多放batchUpperLimit或最多等待maxBatchEndTime的时间,就结束向channel放数据。
当获取了足够的数据,首先放入putList中,然后就会调用tx.commit()将putList的全部数据放入queue中。
也是一个死循环,sink一直试图获取channel锁,然后从channel取一批数据,放入sink和takeList(仅仅用于回滚,在调用rollback时takeList的数据会回滚到queue中)。每次取多少个event呢?以HDFSEventSink为例,代码如下:
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take(); if (event == null) break; }
batchSize的大小默认是100,由hdfs.batchSize控制。
具体如下:
+---------------> ^ | | | while(1) | v +-----------+ | +----+----+ | Source | | take | Sink | | | | | | +-----+-----+ | +---------+ | | | +-------------+--+ | | Channel | | | | While(1) | | | | | buffer | | +----------------+ | | ^ | | | | put v ----------------^
此处回答了前面提到的两个重点:
其实就是用事务保证整个流程的高可靠,其核心就在从source抽取数据到channel,从channel抽取到sink,当sink被消费后channel数据删除的这三个环节。而这些环节在flume中被统一的用事务管理起来。可以说,这是flume高可靠的关键一点。
具体涉及到的几个点如下:
我们下面具体走一下这个流程。
此事务发生在在Source到Channel之间,是从指定的Source中获得Event放入指定的Channel中,具体包括:
如下调用。
try { tx.begin(); //底层就是调用的doPut方法 // Source写事件调用put方法 reqChannel.put(event); tx.commit(); } catch (Throwable t) { // 发生异常则回滚事务 tx.rollback(); if (t instanceof Error) { throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } }
下面分析doPut方法。
doPut逻辑如下:
具体代码如下:
protected void doPut(Event event) throws InterruptedException { //增加放入事件计数器 channelCounter.incrementEventPutAttemptCount(); //estimateEventSize计算当前Event body大小 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); /* * offer若立即可行且不违反容量限制,则将指定的元素插入putList阻塞双端队列中(队尾), * 并在成功时返回,如果当前没有空间可用,则抛异常回滚事务 * */ if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } //记录Event的byte值 putByteCounter += eventByteSize; }
具体如下图,我们暂时忽略commit与rollback:
+----------+ | Source | +---------------------------+ +-----+----+ | [MemoryChannel] | | | +---------------------+ | | | | [MemoryTransaction] | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | putByteCounter | | | | | | | | | | +-----------+ | | +----------------> | putList | | | doPut | | +-----------+ | | | +---------------------+ | +---------------------------+
此事务发生在Channel到Sink之间,主要是从Channel中取出event放入Sink中,具体包括。
如下调用:
transaction = channel.getTransaction(); transaction.begin(); ...... event = channel.take(); ...... transaction.commit();
逻辑如下:
doTake具体代码如下:
protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount();//将正在从channel中取出的event计数器原子的加一,即增加取出事件计数器 //如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event,抛异常 if (takeList.remainingCapacity() == 0) {//takeList队列剩余容量为0 throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } //尝试获取一个信号量获取许可,如果可以获取到许可的话,证明queue队列有空间,超时直接返回null if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; synchronized (queueLock) { event = queue.poll(); //获取并移除MemoryChannel双端队列表示的队列的头部(也就是队列的第一个元素),队列为空返回null,同一时间只能有一个线程访问,加锁同步 } //因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); takeList.put(event); //将取出的event暂存到事务的takeList队列 //计算当前Event body大小并增加取出队列字节数计数器 /* 计算event的byte大小 */ int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); //更新takeByteCounter大小 takeByteCounter += eventByteSize; return event; }
于是我们把take事务加入,我们暂时忽略commit与rollback。具体如下图,目前两个事务是没有联系的:
+----------+ +-------+ | Source | +---------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | | doTake | +----------------> | putList | | takeList +----------------+ doPut | | +-----------+ +------+-----+ | | | | ^ | | | | | | | | +--------------------------------------------------+ | | | | | | | | | | | +---------+ poll | | | | queue | +---------+ | | +---------+ | +---------------------------------------------------------+
commit阶段主要做的事情是提交事务,此代码繁杂在于其包括了两个方面的操作:
commit其逻辑如下:
int remainingChange = takeList.size() - putList.size();
具体如下:
protected void doCommit() throws InterruptedException { //计算改变的Event数量,即取出数量-放入数量;如果放入的多,那么改变的Event数量将是负数 //如果takeList更小,说明该MemoryChannel放的数据比取的数据要多,所以需要判断该MemoryChannel是否有空间来放 int remainingChange = takeList.size() - putList.size(); //takeList.size()可以看成source,putList.size()看成sink //如果remainingChange小于0,则需要获取Channel Queue剩余容量的信号量 if (remainingChange < 0) { //sink的消费速度慢于source的产生速度 //利用bytesRemaining信号量判断是否有足够空间接收putList中的events所占的空间 //putByteCounter是需要推到channel中的数据大小,bytesRemainingchannel是容量剩余 //获取putByteCounter个字节容量信号量,如果失败说明超过字节容量限制了,回滚事务 if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { //channel 数据大小容量不足,事物不能提交 throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); } //获取Channel Queue的-remainingChange个信号量用于放入-remainingChange个Event,如果获取不到,则释放putByteCounter个字节容量信号量,并抛出异常回滚事务 //因为source速度快于sink速度,需判断queue是否还有空间接收event if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { //remainingChange如果是负数的话,说明source的生产速度,大于sink的消费速度,且这个速度大于channel所能承载的值 bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } } int puts = putList.size(); //事务期间生产的event int takes = takeList.size(); //事务期间等待消费的event //如果上述两个信号量都有空间的话,那么把putList中的Event放到该MemoryChannel中的queue中。 //锁住队列开始,进行数据的流转 synchronized (queueLock) {//操作Channel Queue时一定要锁定queueLock if (puts > 0) { while (!putList.isEmpty()) { //如果有Event,则循环放入Channel Queue if (!queue.offer(putList.removeFirst())) { //如果放入Channel Queue失败了,说明信号量控制出问题了,这种情况不应该发生 throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //以上步骤执行成功,清空事务的putList和takeList putList.clear(); takeList.clear(); } //更新queue大小控制的信号量bytesRemaining //释放takeByteCounter个字节容量信号量 bytesRemaining.release(takeByteCounter); //重置字节计数器 takeByteCounter = 0; putByteCounter = 0; //释放puts个queueStored信号量,这样doTake方法就可以获取数据了 queueStored.release(puts); //从queueStored释放puts个信号量 //释放remainingChange个queueRemaining信号量 if (remainingChange > 0) { queueRemaining.release(remainingChange); } //ChannelCounter一些数据计数 if (puts > 0) { //更新成功放入Channel中的events监控指标数据 channelCounter.addToEventPutSuccessCount(puts); } if (takes > 0) { //更新成功从Channel中取出的events的数量 channelCounter.addToEventTakeSuccessCount(takes); } channelCounter.setChannelSize(queue.size()); }
此处涉及到两个信号量:
queueStored表示Channel Queue已存储事件容量(已存储的事件数量),队列取出事件时-1,放入事件成功时+N,取出失败时-N,即Channel Queue存储了多少事件。
queueRemaining表示Channel Queue可存储事件容量(可存储的事件数量),取出事件成功时+N,放入事件成功时-N。
而bytesRemaining是字节容量信号量,超出容量则回滚事务。
具体如下图,现在整体业务已经走通:
+----------+ +-------+ | Source | +---------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | | doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+------+ +------+-----+ | | | | | ^ | | | | | | | | | +--------------------------------------------------------+ | | | | poll | | | | | | | | | | | doCommit +---------+ doCommit | | | +------------> | queue | +---------+ | | +---------+ | +---------------------------------------------------------------+
手机如下图:
当一个事务失败时,会进行回滚,即调用本方法。在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。具体逻辑如下:
具体代码如下:
protected void doRollback() { //获取takeList的大小,然后bytesRemaining中释放 int takes = takeList.size(); //将takeList中的Event重新放回到queue队列中。 synchronized (queueLock) { //操作Channel Queue时一定锁住queueLock //前置条件判断,检查是否有足够容量回滚事务 Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); //回滚事务的takeList队列到Channel Queue while (!takeList.isEmpty()) { //takeList不为空,将其events全部放回queue //removeLast()获取并移除此双端队列的最后一个元素 queue.addFirst(takeList.removeLast()); } //最后清空putList putList.clear(); } //清空了putList,所以需要把putList占用的空间添加到bytesRemaining中 //即,释放putByteCounter个bytesRemaining信号量 bytesRemaining.release(putByteCounter); //计数器重置 putByteCounter = 0; takeByteCounter = 0; //释放takeList队列大小个已存储事件容量 queueStored.release(takes); channelCounter.setChannelSize(queue.size()); }
具体如下图:
+----------+ +-------+ | Source | +----------------------------------------------------------------+ | Sink | +-----+----+ | [MemoryChannel] | +---+---+ | | +--------------------------------------------------------+ | ^ | | | [MemoryTransaction] | | | | | | | | | | | | | | | | | | channelCounter | | | | | | | | | | | | putByteCounter takeByteCounter | | | | | | | | | | | | +-----------+ +------------+ | |doTake | +----------------> | putList | | takeList +----------------+ doPut | | +----+--+---+ +----+---+---+ | | | | | ^ | ^ | | | | | | | | | | | +--------------------------------------------------------+ | | | | | | poll | | | | | | | | | | rollback rollback | | | | | +--------------+ +-------------+ | | | | | | | | | | | v | | | | doCommit +--+--+---+ doCommit | | | +------------> | queue | +-----------+ | | +---------+ | +----------------------------------------------------------------+
手机上如图:
此小节回答了如下问题:
MemoryChannel 中使用锁配合信号实现动态增减容量。
MemoryChannel会通过configure方法获取配置文件系统,初始化MemoryChannel,其中对于配置信息的读取有两种方法,只在启动时读取一次或者动态的加载配置文件,动态读取配置文件时若修改了Channel 的容量大小,则会调用 resizeQueue 方法进行调整,如下:
if (queue != null) { //queue不为null,则为动态修改配置文件时,重新指定了capacity try { resizeQueue(capacity); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { //初始化queue,根据指定的capacity申请双向阻塞队列,并初始化信号量 synchronized (queueLock) { queue = new LinkedBlockingDeque(capacity); queueRemaining = new Semaphore(capacity); queueStored = new Semaphore(0); } }
动态调整 Channel 容量主要分为三种情况:
新老容量相同,则直接返回;
老容量大于新容量,缩容,需先给未被占用的空间加锁,防止在缩容时有线程再往其写数据,然后创建新容量的队列,将原本队列加入中所有的 event 添加至新队列中;
老容量小于新容量,扩容,然后创建新容量的队列,将原本队列加入中所有的 event 添加至新队列中。
具体代码如下:
private void resizeQueue(int capacity) throws InterruptedException { int oldCapacity; //首先计算扩容前的Channel Queue的容量 //计算原本的Channel Queue的容量 synchronized (queueLock) { //老的容量=队列现有余额+在事务被处理了但是是未被提交的容量 oldCapacity = queue.size() + queue.remainingCapacity(); } //新容量和老容量相等,不需要调整返回 if (oldCapacity == capacity) {//如果老容量大于新容量,缩容 return; } else if (oldCapacity > capacity) { //缩容 //首先要预占老容量-新容量的大小,以便缩容容量 //首先要预占用未被占用的容量,防止其他线程进行操作 //尝试占用即将缩减的空间,以防被他人占用 if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { //如果获取失败,默认是记录日志然后忽略 LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted"); } else { //直接缩容量 //锁定queueLock进行缩容,先创建新capacity的双端阻塞队列,然后复制老Queue数据。线程安全 //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,因为这一系列操作要线程安全 synchronized (queueLock) { LinkedBlockingDequenewQueue = new LinkedBlockingDeque(capacity); newQueue.addAll(queue); queue = newQueue; } } } else { //扩容,加锁,创建新newQueue,复制老queue数据 //扩容 synchronized (queueLock) { LinkedBlockingDequenewQueue = new LinkedBlockingDeque(capacity); newQueue.addAll(queue); queue = newQueue; } //增加/减少Channel Queue的新的容量 //释放capacity - oldCapacity个许可,即就是增加这么多可用许可 queueRemaining.release(capacity - oldCapacity); } }
回到本文最初的错误信息:Space for commit to queue couldn't be acquired。
这说明Flume是会出现数据相关问题的。我们首先分析此问题。
因为“source往putList放数据,然后提交到queue中”与“sink从channel中取数据到sink和takeList,然后再从putList取数据到queue中”这两部分是分开来,任他们自由抢锁,所以,当前者多次抢到锁,后者没有抢到锁,同时queue的大小又太小,撑不住多次往里放数据,就会导致触发这个异常。
正常情况下,如果遇到此问题,flume会暂停source向channel放数据,等待几秒钟,这期间sink应该会消费channel中的数据,当source再次开始想channel放数据时channel就有足够的空间了。
但是如果一直出现异常,就需要启用解决方案。
解决这个问题最直接的办法就是增大queue的大小,增大capacity和transacCapacity之间的差距,queue能撑住多次往里面放数据即可。
下面我们看看Flume使用中,丢失数据的可能。
根据Flume的架构原理,采用FileChannel的Flume是不可能丢失数据的,因为其内部有完善的事务机制(ACID)。
这两个环节都不可能丢失数据。
一旦管道中所有Flume Agent的容量之和被使用完,Flume 将不再接受来自客户端的数据。此时,客户端需要缓冲数据,否则数据可能会丢失。因此,配置管道能够处理最大预期的停机时间是非常重要的。
Channel采用MemoryChannel时候,会出现丢失。
所以如果想要不丢失数据,需要采用File channel。
Memory Channel 是一个内存缓冲区,因此如果Java23 虚拟机(JVM)或机器重新启动,任何缓冲区中的数据将丢失。另一方面,File Channel是在磁盘上的。即使JVM 或机器重新启动,File Channel 也不丢失数据,只要磁盘上存储的数据仍然是起作用的和可访问的。机器和Agent 一旦开始运行,任何存储在FileChannel 中的数据将最终被访问。
在Channel发送到Sink这阶段,容易出现数据重复问题。
比如:如果flush到HDFS的时候,数据flush了一半之后出问题了,这意味着已经有一半的数据已经发送到HDFS上面了,现在出了问题,同样需要调用doRollback方法来进行回滚。
回滚并没有“一半”之说,它只会把整个takeList中的数据返回给channel,然后继续进行数据的读写。这样开启下一个事务的时候就容易造成数据重复的问题。
所以,在某种程度上,flume对数据进行采集传输的时候,它有可能会造成数据的重复,但是其数据不丢失。
Flume 保证事件至少一次被送到它们的目的地,只有一次倾力写数据,且不存在任何类型的故障事件只被写一次。但是像网络超时或部分写入存储系统的错误,可能导致事件不止被写一次,因为Flume 将重试写操作直到它们完全成功。网络超时可能表示写操作的失败,或者只是机器运行缓慢。如果是机器运行缓慢,当Flume 重试这将导致重复。因此,确保每个事件都有某种形式的唯一标识符通常是一个好主意,如果需要,最终可以用来删除事件数据。
基于Flume的美团日志收集系统(一)架构和设计
基于Flume的美团日志收集系统(二)改进和优化
事件序列化器 Flume 的无数据丢失保证,Channel 和事务
flume MemoryChannel分析
Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(五)从Channel获取数据写入Sink
Flume - MemoryChannel源码解析
flume到底会丢数据吗?其可靠性如何?——轻松搞懂Flume事务机制
Flume会不会丢失数据?
flume MemoryChannel分析
Flume架构与源码分析-MemoryChannel事务实现
flume“Space for commit to queue couldn't be acquired”异常产生分析
源码趣事-flume-队列动态扩容及容量使用
并发性标注 @GuardedBy @NotThreadSafe @ThreadSafe
秒懂,Java 注解 (Annotation)你可以这样学
Flume之MemoryChannel源码解读
Flume MemoryChannel源码分析
搞懂分布式技术17,18:分布式事务总结