Bitcask是一种“基于日志结构的哈希表”(A Log-Structured Hash Table for Fast Key/Value Data)
Bitcask 最初作为分布式数据库 Riak 的后端出现,Riak 中的每个节点都运行一个 Bitcask 实例,各自存储其负责的数据。
抛开论文,我们先通过一篇博客 # Bitcask — a log-structured fast KV store 来了解bitcask的细节信息,下面是简要的译文。
Bitcask 借鉴了大量来自日志结构文件系统和涉及日志文件合并的设计,例如 LSM 树中的合并。它本质上是一个目录,包含固定结构的追加日志文件和一个内存索引。内存索引以哈希表的形式存储所有键及其对应的值所在数据文件中的偏移量和其他必要信息,用于快速查找到对应的条目。
数据文件是追加日志文件,存储键值对和一些元信息。一个 Bitcask 实例可以拥有多个数据文件,其中只有一个处于活动状态,用于写入,其他文件为只读文件。
数据文件中的每个条目都有固定的结构,我们可以用类似下面的数据结构来描述:
struct log_entry { uint32_t crc; uint32_t timestamp; uint32_t key_size; uint32_t value_size; char key[key_size]; char value[value_size]; };
键目录是一个内存哈希表,存储 Bitcask 实例中所有键及其对应的值所在数据文件中的偏移量和一些元信息,例如时间戳,可以用类似下面的数据结构来描述:
struct key_entry { uint32_t file_id; uint32_t offset; uint32_t timestamp; };
将新的键值对存储到 Bitcask 时,引擎首先将其追加到活动数据文件中,然后在键目录中创建一个新条目,指定值的存储位置。这两个动作都是原子性的,意味着条目要么同时写入两个结构,要么都不写入。
Bitcask 直接支持完全替换值,但不支持部分更新。因此,更新操作与存储新键值对非常相似,唯一的区别是不会在键目录中创建新条目,而是更新现有条目的信息,可能指向新的数据文件中的新位置。
与旧值对应的条目现在处于“游离状态”,将在合并和压缩过程中显式地进行垃圾回收。
删除键是一个特殊的操作,引擎会原子性地将一个新的条目追加到活动数据文件中,其中值等于一个标志删除的特殊值,然后从内存键目录中删除该键的条目。该标志值非常独特,不会与现有值空间冲突。
从存储中读取键值对需要引擎首先使用键目录找到该键对应的数据文件和偏移量。然后,引擎从相应的偏移量处执行一次磁盘读取,检索日志条目。检索到的值与存储的校验码进行正确性检查,然后将值返回给客户端。
该操作本身非常快速,只涉及一次磁盘读取和几次内存访问,但可以使用文件系统预读缓存进一步提高速度。
正如我们在更新和删除操作中看到的,与键关联的旧条目保持原样,处于“游离状态”。这会导致 Bitcask 消耗大量磁盘空间。为了提高磁盘利用率,引擎会定期将较旧的已关闭数据文件压缩成一个或多个新数据文件,其结构与现有数据文件相同。
合并过程遍历 Bitcask 中所有只读文件,生成一组数据文件,只包含每个存在的键的“最新”版本。
如果 Bitcask 发生故障并需要重启,它必须读取所有的数据文件并构建一个新的键目录(KeyDir),如果没有专门存储,需要读取所有文件重建。
其实上面的合并和压缩操作可以部分缓解这个问题,一方面它们减少了需要读取的最终会被废弃的数据量,在合并的同事,可以生成一个hint
提示文件,hint记录了key和key指向的meta信息。 这样读取hint文件就可以快速重建键目录(KeyDir)。
我们线上的搜索系统,检索到match的doc后,需要通过id获取doc的详情,考虑到数据量级很大,redis首先排除,我们最初的选型是mongodb,在十亿级别的数据量时,整体问题不大,但是面向未来更大的数据量级,我们需要考虑更容易维护的方案。
当前mongodb的问题:
为了解决上面的问题,我们考虑一种数据分版本的方案。
具体来说,对于KV场景,将每个版本的数据,根据特定的hash规则将数据分成多片,每片离线按照Bitcask的思路,生成好hint文件和数据文件,上接一个分布式服务提供查询即可。对于增量的数据,只需要按同样的hash规则,先生成好数据,将数据文件加载即可,这样可以确保数据的一致。同一组的slot文件,如果一台机器加载不下,可以多台机器加载,分布式服务做好控制即可。查询时,像对key做hash,然后并发去查询对应slot的服务即可。
实际系统中,数据的key都是int64数据,value是json string,我们来设计hint和data文件格式。在不考虑校验的情况下,我们可以用最简单的文件格式来存储。
hint格式,按照 key,value length,offset 依次写入。
| int64 key | int32 value length | int64 value offset | ... | int64 key | int32 value length | int64 value offset |
data 格式,直接append 压缩后的数据。
| compressed value bytes | ... | compressed value bytes |
简单的java代码实现:
FileOutputStream dataFileWriter = new FileOutputStream (outputFile); FileOutputStream indexFileWriter = new FileOutputStream(outPutIndex); long offset = 0; int count = 0; while (dataList.hasNext()) { Document doc = dataList.next(); Long id = doc.getLong("_id"); byte[] value = compress(doc.toJson(),"utf-8"); byte[] length = intToByteLittle(value.length); dataFileWriter.write(value); offset += value.length; indexFileWriter.write(longToBytesLittle(id)); indexFileWriter.write(length); indexFileWriter.write(longToBytesLittle(offset)); count++; if (count % 10000 == 0) { log.info("already load {} items", count); indexFileWriter.flush(); dataFileWriter.flush(); } } indexFileWriter.close(); dataFileWriter.close();
读取,需要先读取hint索引文件,加载到内存。
// read index FileInputStream indexReader = new FileInputStream(indexFile); // id 8字节, offset 4字节 byte[] entry = new byte[8+4+8]; byte[] idBytes = new byte[8]; byte[] lengthBytes = new byte[4]; byte[] offsetBytes = new byte[8]; Map<Long, Pair<Long, Integer>> id2Offset = new HashMap<>(); // 这里直接加载到map里,可以优化 while(indexReader.read(entry, 0, entry.length) > 0) { System.arraycopy(entry,0, idBytes, 0, 8); System.arraycopy(entry,8, lengthBytes, 0, 4); System.arraycopy(entry,12, offsetBytes, 0, 8); long id = EncodingUtils.bytesToLongLittle(idBytes); int dataLength = EncodingUtils.bytes2IntLittle(lengthBytes); long offset = EncodingUtils.bytesToLongLittle(offsetBytes); id2Offset.put(id, Pair.of(offset, dataLength)); }
从数据文件读取数据也会比较简单,先从hint数据获取到key对应的offset和dataLength,然后读取数据解压即可。
RandomAccessFile dataFileReader = new RandomAccessFile(dataFile, "r"); while(true) { System.out.print("请输入查询key:"); String key = System.console().readLine(); long id = Long.parseLong(key); if (id2Offset.containsKey(id)) { long offset = id2Offset.get(id).getFirst(); int dataSize = id2Offset.get(id).getSecond(); dataFileReader.seek(offset); byte[] data = new byte[dataSize]; dataFileReader.read(data, 0, data.length); byte[] decodeData = uncompress(data); System.out.println(new String(decodeData)); } }
上面仅仅是demo性质的代码,实际过程中还要考虑数据的完整性检验,以及LRU缓存等。
没有最好的K-V系统,只有最适合应用业务实际场景的系统,做任何的方案选择,要结合业务当前的实际情况综合权衡,有所取有所舍。