private final MessageStoreConfig messageStoreConfig; //消息配置属性 private final CommitLog commitLog; //CommitLog文件存储的实现类 private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; //消息队列存储缓存表,按照消息主题分组 private final FlushConsumeQueueService flushConsumeQueueService; //消息队列文件刷盘线程 private final CleanCommitLogService cleanCommitLogService; //清除CommitLog文件服务 private final CleanConsumeQueueService cleanConsumeQueueService; //清除ConsumerQueue队列文件服务 private final IndexService indexService; //索引实现类 private final AllocateMappedFileService allocateMappedFileService; //MappedFile分配服务 private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件 private final HAService haService; //存储HA机制 private final ScheduleMessageService scheduleMessageService; //消息服务调度线程 private final StoreStatsService storeStatsService; //消息存储服务 private final TransientStorePool transientStorePool; //消息堆外内存缓存 private final BrokerStatsManager brokerStatsManager; //Broker状态管理器 private final MessageArrivingListener messageArrivingListener; //消息拉取长轮询模式消息达到监听器 private final BrokerConfig brokerConfig; //Broker配置类 private StoreCheckpoint storeCheckpoint; //文件刷盘监测点 private final LinkedList<CommitLogDispatcher> dispatcherList; //CommitLog文件转发请求
消息存储入口:DefaultMessageStore#putMessage
//判断Broker角色如果是从节点,则无需写入 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } //判断当前写入状态如果是正在写入,则不能继续 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } //判断消息主题长度是否超过最大限制 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } //判断消息属性长度是否超过限制 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } //判断系统PageCache缓存去是否占用 if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } //将消息写入CommitLog文件 PutMessageResult result = this.commitLog.putMessage(msg);
代码:CommitLog#putMessage
//记录消息存储时间 msg.setStoreTimestamp(beginLockTimestamp); //判断如果mappedFile如果为空或者已满,创建新的mappedFile文件 if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); } //如果创建失败,直接返回 if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } //写入消息到mappedFile中 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
代码:MappedFile#appendMessagesInner
//获得文件的写入指针 int currentPos = this.wrotePosition.get(); //如果指针大于文件大小则直接返回 if (currentPos < this.fileSize) { //通过writeBuffer.slice()创建一个与MappedFile共享的内存区,并设置position为当前指针 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = null; if (messageExt instanceof MessageExtBrokerInner) { //通过回调方法写入 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; }
代码:CommitLog#doAppend
//文件写入位置 long wroteOffset = fileFromOffset + byteBuffer.position(); //设置消息ID this.resetByteBuffer(hostHolder, 8); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); //获得该消息在消息队列中的偏移量 keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } //获得消息属性长度 final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } //获得消息主题大小 final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; //获得消息体大小 final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; //计算消息总长度 final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
代码:CommitLog#calMsgLength
protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { final int msgLen = 4 //TOTALSIZE + 4 //MAGICCODE + 4 //BODYCRC + 4 //QUEUEID + 4 //FLAG + 8 //QUEUEOFFSET + 8 //PHYSICALOFFSET + 4 //SYSFLAG + 8 //BORNTIMESTAMP + 8 //BORNHOST + 8 //STORETIMESTAMP + 8 //STOREHOSTADDRESS + 4 //RECONSUMETIMES + 8 //Prepared Transaction Offset + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY + 1 + topicLength //TOPIC + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength + 0; return msgLen; }
代码:CommitLog#doAppend
//消息长度不能超过4M if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } //消息是如果没有足够的存储空间则新创建CommitLog文件 if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } //将消息存储到ByteBuffer中,返回AppendMessageResult final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() -beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: //更新消息队列偏移量 CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; }
代码:CommitLog#putMessage
//释放锁 putMessageLock.unlock(); //刷盘 handleDiskFlush(result, putMessageResult, msg); //执行HA主从同步 handleHA(result, putMessageResult, msg);
RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。
1)MappedFileQueue
String storePath; //存储目录 int mappedFileSize; // 单个文件大小 CopyOnWriteArrayList<MappedFile> mappedFiles; //MappedFile文件集合 AllocateMappedFileService allocateMappedFileService; //创建MapFile服务类 long flushedWhere = 0; //当前刷盘指针 long committedWhere = 0; //当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushWhere
public MappedFile getMappedFileByTime(final long timestamp) { Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return null; //遍历MappedFile文件数组 for (int i = 0; i < mfs.length; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; //MappedFile文件的最后修改时间大于指定时间戳则返回该文件 if (mappedFile.getLastModifiedTimestamp() >= timestamp) { return mappedFile; } } return (MappedFile) mfs[mfs.length - 1]; }
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { //获得第一个MappedFile文件 MappedFile firstMappedFile = this.getFirstMappedFile(); //获得最后一个MappedFile文件 MappedFile lastMappedFile = this.getLastMappedFile(); //第一个文件和最后一个文件均不为空,则进行处理 if (firstMappedFile != null && lastMappedFile != null) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { } else { //获得文件索引 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { //根据索引返回目标文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
public long getMinOffset() { if (!this.mappedFiles.isEmpty()) { try { return this.mappedFiles.get(0).getFileFromOffset(); } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getMinOffset has exception.", e); } } return -1; }
public long getMaxOffset() { MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); } return 0; }
public long getMaxWrotePosition() { MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); } return 0; }
2)MappedFile
int OS_PAGE_SIZE = 1024 * 4; //操作系统每页大小,默认4K AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); //当前JVM实例中MappedFile虚拟内存 AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); //当前JVM实例中MappedFile对象个数 AtomicInteger wrotePosition = new AtomicInteger(0); //当前文件的写指针 AtomicInteger committedPosition = new AtomicInteger(0); //当前文件的提交指针 AtomicInteger flushedPosition = new AtomicInteger(0); //刷写到磁盘指针 int fileSize; //文件大小 FileChannel fileChannel; //文件通道 ByteBuffer writeBuffer = null; //堆外内存ByteBuffer TransientStorePool transientStorePool = null; //堆外内存池 String fileName; //文件名称 long fileFromOffset; //该文件的处理偏移量 File file; //物理文件 MappedByteBuffer mappedByteBuffer; //物理文件对应的内存映射Buffer volatile long storeTimestamp = 0; //文件最后一次内容写入时间 boolean firstCreateInQueue = false; //是否是MappedFileQueue队列中第一个文件
MappedFile初始化
transientStorePoolEnable
。transientStorePoolEnable=true
为true
表示数据先存储到堆外内存,然后通过Commit
线程将数据提交到内存映射Buffer中,再通过Flush
线程将内存映射Buffer
中数据持久化磁盘。private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; ensureDirOK(this.file.getParent()); try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("create file channel " + this.fileName + " Failed. ", e); throw e; } catch (IOException e) { log.error("map file " + this.fileName + " Failed. ", e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }
开启transientStorePoolEnable
public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); this.writeBuffer = transientStorePool.borrowBuffer(); //初始化writeBuffer this.transientStorePool = transientStorePool; }
MappedFile提交
提交数据到FileChannel,commitLeastPages为本次提交最小的页数,如果待提交数据不满commitLeastPages,则不执行本次提交操作。如果writeBuffer如果为空,直接返回writePosition指针,无需执行commit操作,表名commit操作主体是writeBuffer。
public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } //判断是否满足提交条件 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // 所有数据提交后,清空缓冲区 if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); }
MappedFile#isAbleToCommit
判断是否执行commit操作,如果文件已满返回true;如果commitLeastpages大于0,则比较writePosition与上一次提交的指针commitPosition的差值,除以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages则返回true,如果commitLeastpages小于0表示只要存在脏页就提交。
protected boolean isAbleToCommit(final int commitLeastPages) { //已经刷盘指针 int flush = this.committedPosition.get(); //文件写指针 int write = this.wrotePosition.get(); //写满刷盘 if (this.isFull()) { return true; } if (commitLeastPages > 0) { //文件内容达到commitLeastPages页数,则刷盘 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush; }
MappedFile#commit0
具体提交的实现,首先创建WriteBuffer区共享缓存区,然后将新创建的position回退到上一次提交的位置(commitPosition),设置limit为wrotePosition(当前最大有效数据指针),然后把commitPosition到wrotePosition的数据写入到FileChannel中,然后更新committedPosition指针为wrotePosition。commit的作用就是将MappedFile的writeBuffer中数据提交到文件通道FileChannel中。
protected void commit0(final int commitLeastPages) { //写指针 int writePos = this.wrotePosition.get(); //上次提交指针 int lastCommittedPosition = this.committedPosition.get(); if (writePos - this.committedPosition.get() > 0) { try { //复制共享内存区域 ByteBuffer byteBuffer = writeBuffer.slice(); //设置提交位置是上次提交位置 byteBuffer.position(lastCommittedPosition); //最大提交数量 byteBuffer.limit(writePos); //设置fileChannel位置为上次提交位置 this.fileChannel.position(lastCommittedPosition); //将lastCommittedPosition到writePos的数据复制到FileChannel中 this.fileChannel.write(byteBuffer); //重置提交位置 this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
MappedFile#flush
刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。
public int flush(final int flushLeastPages) { //数据达到刷盘条件 if (this.isAbleToFlush(flushLeastPages)) { //加锁,同步刷盘 if (this.hold()) { //获得读指针 int value = getReadPosition(); try { //数据从writeBuffer提交数据到fileChannel再刷新到磁盘 if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { //从mmap刷新数据到磁盘 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } //更新刷盘位置 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
MappedFile#getReadPosition
获取当前文件最大可读指针。如果writeBuffer为空,则直接返回当前的写指针;如果writeBuffer不为空,则返回上一次提交的指针。在MappedFile设置中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据
public int getReadPosition() { //如果writeBuffer为空,刷盘的位置就是应该等于上次commit的位置,如果为空则为mmap的写指针 return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }
MappedFile#selectMappedBuffer
查找pos到当前最大可读之间的数据,由于在整个写入期间都未曾改MappedByteBuffer的指针,如果mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile,然后通过设置ByteBuffer的position为待查找的值,读取字节长度当前可读最大长度,最终返回的ByteBuffer的limit为size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作SelectMappedBufferResult不能对包含在里面的ByteBuffer调用filp方法。
public SelectMappedBufferResult selectMappedBuffer(int pos) { //获得最大可读指针 int readPosition = getReadPosition(); //pos小于最大可读指针,并且大于0 if (pos < readPosition && pos >= 0) { if (this.hold()) { //复制mappedByteBuffer读共享区 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); //设置读指针位置 byteBuffer.position(pos); //获得可读范围 int size = readPosition - pos; //设置最大刻度范围 ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } } return null; }
MappedFile#shutdown
MappedFile文件销毁的实现方法为public boolean destory(long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间。
public void shutdown(final long intervalForcibly) { if (this.available) { //关闭MapedFile this.available = false; //设置当前关闭时间戳 this.firstShutdownTimestamp = System.currentTimeMillis(); //释放资源 this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); } } }
3)TransientStorePool
短暂的存储池。RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
private final int poolSize; //availableBuffers个数 private final int fileSize; //每隔ByteBuffer大小 private final Deque<ByteBuffer> availableBuffers; //ByteBuffer容器。双端队列
初始化
public void init() { //创建poolSize个堆外内存 for (int i = 0; i < poolSize; i++) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); //使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能 LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); availableBuffers.offer(byteBuffer); } }
消息消费队文件、消息属性索引文件都是基于CommitLog文件构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumerQueue、IndexFile文件。
代码:DefaultMessageStore:start
//设置CommitLog内存中最大偏移量 this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); //启动 this.reputMessageService.start();
代码:DefaultMessageStore:run
public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); //每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件 while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
代码:DefaultMessageStore:deReput
//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。 for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); } } }
DispatchRequest
String topic; //消息主题名称 int queueId; //消息队列ID long commitLogOffset; //消息物理偏移量 int msgSize; //消息长度 long tagsCode; //消息过滤tag hashCode long storeTimestamp; //消息存储时间戳 long consumeQueueOffset; //消息队列偏移量 String keys; //消息索引key boolean success; //是否成功解析到完整的消息 String uniqKey; //消息唯一键 int sysFlag; //消息系统标记 long preparedTransactionOffset; //消息预处理事务偏移量 Map<String, String> propertiesMap; //消息属性 byte[] bitMap; //位图
1)转发到ConsumerQueue
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: //消息分发 DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }
代码:DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) { //获得消费队列 ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); //消费队列分发消息 cq.putMessagePositionInfoWrapper(dispatchRequest); }
代码:DefaultMessageStore#putMessagePositionInfo
//依次将消息偏移量、消息长度、tag写入到ByteBuffer中 this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); //获得内存映射文件 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { //将消息追加到内存映射文件,异步输盘 return mappedFile.appendMessage(this.byteBufferIndex.array()); }
2)转发到Index
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } } }
代码:DefaultMessageStore#buildIndex
public void buildIndex(DispatchRequest req) { //获得索引文件 IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { //获得文件最大物理偏移量 long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); //如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建 if (msg.getCommitLogOffset() < endPhyOffset) { return; } final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: break; case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: return; } //如果消息ID不为空,则添加到Hash索引中 if (req.getUniqKey() != null) { indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); if (indexFile == null) { return; } } //构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开. if (keys != null && keys.length() > 0) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0; i < keyset.length; i++) { String key = keyset[i]; if (key.length() > 0) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null) { return; } } } } } else { log.error("build index error, stop building index"); } }
由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到ConsumerQueue,这部分消息将永远复发被消费者消费。
1)存储文件加载
代码:DefaultMessageStore#load
判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件。说明Broker时异常退出的,CommitLog与ConsumerQueue数据有可能不一致,需要进行修复。
//判断临时文件是否存在 boolean lastExitOK = !this.isTempFileExist(); //根据临时文件判断当前Broker是否异常退出 private boolean isTempFileExist() { String fileName = StorePathConfigHelper .getAbortFile(this.messageStoreConfig.getStorePathRootDir()); File file = new File(fileName); return file.exists(); }
代码:DefaultMessageStore#load
//加载延时队列 if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } // 加载CommitLog文件 result = result && this.commitLog.load(); // 加载消费队列文件 result = result && this.loadConsumeQueue(); if (result) { //加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点 this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); //加载index文件 this.indexService.load(lastExitOK); //根据Broker是否异常退出,执行不同的恢复策略 this.recover(lastExitOK); }
代码:MappedFileQueue#load
加载CommitLog到映射文件
//指向CommitLog文件目录 File dir = new File(this.storePath); //获得文件数组 File[] files = dir.listFiles(); if (files != null) { // 文件排序 Arrays.sort(files); //遍历文件 for (File file : files) { //如果文件大小和配置文件不一致,退出 if (file.length() != this.mappedFileSize) { return false; } try { //创建映射文件 MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); //将映射文件添加到队列 this.mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.error("load file " + file + " error", e); return false; } } } return true;
代码:DefaultMessageStore#loadConsumeQueue
加载消息消费队列
//执行消费队列目录 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); //遍历消费队列目录 File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { for (File fileTopic : fileTopicList) { //获得子目录名称,即topic名称 String topic = fileTopic.getName(); //遍历子目录下的消费队列文件 File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList != null) { //遍历文件 for (File fileQueueId : fileQueueIdList) { //文件名称即队列ID int queueId; try { queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue; } //创建消费队列并加载到内存 ConsumeQueue logic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); this.putConsumeQueue(topic, queueId, logic); if (!logic.load()) { return false; } } } } } log.info("load logics queue all over, OK"); return true;
代码:IndexService#load
加载索引文件
public boolean load(final boolean lastExitOK) { //索引文件目录 File dir = new File(this.storePath); //遍历索引文件 File[] files = dir.listFiles(); if (files != null) { //文件排序 Arrays.sort(files); //遍历文件 for (File file : files) { try { //加载索引文件 IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); f.load(); if (!lastExitOK) { //索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除 if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint() .getIndexMsgTimestamp()) { f.destroy(0); continue; } } //将索引文件添加到队列 log.info("load index file OK, " + f.getFileName()); this.indexFileList.add(f); } catch (IOException e) { log.error("load file {} error", file, e); return false; } catch (NumberFormatException e) { log.error("load file {} error", file, e); } } } return true; }
代码:DefaultMessageStore#recover
文件恢复,根据Broker是否正常退出执行不同的恢复策略
private void recover(final boolean lastExitOK) { //获得最大的物理便宜消费队列 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { //正常恢复 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { //异常恢复 this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } //在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量 this.recoverTopicQueueTable(); }
代码:DefaultMessageStore#recoverTopicQueueTable
恢复ConsumerQueue后,将在CommitLog实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID、还存储了消息队列的关键所在。
public void recoverTopicQueueTable() { HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); //CommitLog最小偏移量 long minPhyOffset = this.commitLog.getMinOffset(); //遍历消费队列,将消费队列保存在CommitLog中 for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } this.commitLog.setTopicQueueTable(table); }
2)正常恢复
代码:CommitLog#recoverNormally
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { //Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。 int index = mappedFiles.size() - 3; if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); //代表当前已校验通过的offset long mappedFileOffset = 0; while (true) { //查找消息 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); //消息长度 int size = dispatchRequest.getMsgSize(); //查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度 if (dispatchRequest.isSuccess() && size > 0) { mappedFileOffset += size; } //如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。 else if (dispatchRequest.isSuccess() && size == 0) { index++; if (index >= mappedFiles.size()) { // Current branch can not happen break; } else { //取出每个文件 mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; } } // 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环 else if (!dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName()); break; } } //更新MappedFileQueue的flushedWhere和committedWhere指针 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); //删除offset之后的所有文件 this.mappedFileQueue.truncateDirtyFiles(processOffset); if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } else { this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); } }
代码:MappedFileQueue#truncateDirtyFiles
public void truncateDirtyFiles(long offset) { List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>(); //遍历目录下文件 for (MappedFile file : this.mappedFiles) { //文件尾部的偏移量 long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize; //文件尾部的偏移量大于offset if (fileTailOffset > offset) { //offset大于文件的起始偏移量 if (offset >= file.getFileFromOffset()) { //更新wrotePosition、committedPosition、flushedPosistion file.setWrotePosition((int) (offset % this.mappedFileSize)); file.setCommittedPosition((int) (offset % this.mappedFileSize)); file.setFlushedPosition((int) (offset % this.mappedFileSize)); } else { //offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件 file.destroy(1000); willRemoveFiles.add(file); } } } this.deleteExpiredFile(willRemoveFiles); }
3)异常恢复
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果CommitLog目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。
代码:CommitLog#recoverAbnormally
if (!mappedFiles.isEmpty()) { // Looking beginning to recover from which file int index = mappedFiles.size() - 1; MappedFile mappedFile = null; for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); //判断消息文件是否是一个正确的文件 if (this.isMappedFileMatchedRecover(mappedFile)) { log.info("recover from this mapped file " + mappedFile.getFileName()); break; } } //根据索引取出mappedFile文件 if (index < 0) { index = 0; mappedFile = mappedFiles.get(index); } //...验证消息的合法性,并将消息转发到消息消费队列和索引文件 }else{ //未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件 this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); }
RocketMQ的存储是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。
同步刷盘
消息追加到内存后,立即将数据刷写到磁盘文件
代码:CommitLog#handleDiskFlush
//刷盘服务 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { //封装刷盘请求 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); //提交刷盘请求 service.putRequest(request); //线程阻塞5秒,等待刷盘结束 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); }
GroupCommitRequest
long nextOffset; //刷盘点偏移量 CountDownLatch countDownLatch = new CountDownLatch(1); //倒计树锁存器 volatile boolean flushOK = false; //刷盘结果;默认为false
代码:GroupCommitService#run
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //线程等待10ms this.waitForRunning(10); //执行提交 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } ... }
代码:GroupCommitService#doCommit
private void doCommit() { //加锁 synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { //遍历requestsRead for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); //刷盘 if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } //唤醒发送消息客户端 req.wakeupCustomer(flushOK); } //更新刷盘监测点 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
异步刷盘
在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。
开启transientStorePoolEnable后异步刷盘步骤:
代码:CommitLog$CommitRealTimeService#run
提交线程工作机制
//间隔时间,默认200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); //一次提交的至少页数 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); //两次真实提交的最大间隔,默认200ms int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交 long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } //执行提交操作,将待提交数据提交到物理文件的内存映射区 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. //唤醒刷盘线程 flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval);
代码:CommitLog$FlushRealTimeService#run
刷盘线程工作机制
//表示await方法等待,默认false boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); //线程执行时间间隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //一次刷写任务至少包含页数 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); //两次真实刷写任务最大间隔 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); ... //距离上次提交间隔超过flushPhysicQueueThoroughInterval,则本次刷盘任务将忽略flushPhysicQueueLeastPages,直接提交 long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } ... //执行一次刷盘前,先等待指定时间间隔 if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } ... long begin = System.currentTimeMillis(); //刷写磁盘 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { //更新存储监测点文件的时间戳 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
由于RocketMQ操作CommitLog、ConsumerQueue文件是基于内存映射机制并在启动的时候回加载CommitLog、ConsumerQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog、ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。
代码:DefaultMessageStore#addScheduleTask
private void addScheduleTask() { //每隔10s调度一次清除文件 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); ... }
代码:DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() { //清除存储文件 this.cleanCommitLogService.run(); //清除消息消费队列文件 this.cleanConsumeQueueService.run(); }
代码:DefaultMessageStore#deleteExpiredFiles
private void deleteExpiredFiles() { //删除的数量 int deleteCount = 0; //文件保留的时间 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); //删除物理文件的间隔 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); //线程被占用,第一次拒绝删除后能保留的最大时间,超过该时间,文件将被强制删除 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean timeup = this.isTimeToDelete(); boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { ...执行删除逻辑 }else{ ...无作为 }
删除文件操作的条件
代码:CleanCommitLogService#isSpaceToDelete
当磁盘空间不足时执行删除过期文件
private boolean isSpaceToDelete() { //磁盘分区的最大使用量 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; //是否需要立即执行删除过期文件操作 cleanImmediately = false; { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); //当前CommitLog目录所在的磁盘分区的磁盘使用率 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); //diskSpaceWarningLevelRatio:磁盘使用率警告阈值,默认0.90 if (physicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } //diskSpaceCleanForciblyRatio:强制清除阈值,默认0.85 cleanImmediately = true; } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } }
代码:MappedFileQueue#deleteExpiredFileByTime
执行文件销毁和删除
for (int i = 0; i < mfsLength; i++) { //遍历每隔文件 MappedFile mappedFile = (MappedFile) mfs[i]; //计算文件存活时间 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; //如果超过72小时,执行文件删除 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } }
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。
CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。
RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。