MVCC模块ETCD的存储模块,是ETCD核心模块。
作为一个开源项目,其代码的封装是值得我们学习的。MVCC作为底层模块,对上层提供统一的方法,而这些方法都定义在kv.go这个文件中,很像一个头文件(.h)。我们可以只看kv.go以及配合kv_test.go就可以知道mvcc包是怎么用的。
type KV interface { ReadView WriteView // Read creates a read transaction. Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) // HashByRev computes the hash of all MVCC revisions up to a given revision. HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) // Commit commits outstanding txns into the underlying backend. Commit() // Restore restores the KV store from a backend. Restore(b backend.Backend) error Close() error }
(我只复制了最重要的Interface,请结合kv.go文件来看)
我们是这样来使用KV的
func test() { kv := mvcc.New(...) kv.Put(key, value, ... kv.Range(key, end, ...) // Range就是Get方法 ... }
mvcc有一个New
方法,返回ConsistentWatchableKV
,它继承自KV,用来实现ETCD的Watch机制。我们现在讨论KV
。
作为一个Key-Value存储,存储模块至少要支持增删改查
。KV
首先定义了
Put(key, value []byte, lease lease.LeaseID) (rev int64) // “增”、“改”,WriteView中定义,KV继承ReadView Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) // “查”,ReadView中定义 DeleteRange(key, end []byte) (n, rev int64) // “删”,WriteView中定义,n是删除的个数
这三个方法实现了数据的基本操作。
参数很好理解
同时KV
提供了事务操作
Read(trace *traceutil.Trace) TxnRead Write(trace *traceutil.Trace) TxnWrite Commit() // 提交未提交的事务
顾名思义,一个读事务、一个写事务
参数
除了基本数据操作,KV
提供了一些不容易理解的方法
Hash() (hash uint32, revision int64, err error) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) Restore(b backend.Backend) error
这些方法从名字上我们或许能够知道是在干什么,但却不知道为什么要这么干。
(建议读者阅读kv_test.go中所有的测试用例,便可以更深刻了解 KV
)
如此,我们便有了这样一个轮廓:
MVCC为其他模块提供了这些功能。接下来,我们就详细看下这些功能是怎么实现的。
在讨论实现过程的时候,我不会仔细介绍每一行代码,这样难以理解并且也没有必要。但我希望读者能够自己阅读并跑一遍测试用例。
在看代码之前,请先试想,如果让你来做底层的KV实现,你会怎么做?
你可能会这么做:
既然是KV存储,用一个Map来存储。如果需要持久化存储,将Map中的数据拷贝一份到DB,Map中保留最新数据。
如果你是这么想的,那你已经实现了底层功能。你做到了:
或许通过封装很容易做到:
但你很难做到:
并且你这样做有些不足:
看完这些你有其他的想法实现上述难以做到的功能吗?
你可能会放弃使用Map做内存存储,而使用一种数据结构(B树或B+树)来代替。给每一kvpair附带一个版本号。
我们试想了这么多,其实已经猜出了KV
底层实现的80%
具体看下
mvcc包中,store是KV
的具体实现,store支持Put、Range等操作。首先需要关注store中的一个变量:currentRev
。它是一个“严格”递增的版本号。“严格”是指currentRev的每次递增都会有锁。
store的每次写操作(Put、Delete等)都会使currentRev递增。currentRev就是Revision
理解这句话很重要。如下图
紧接着,我们需要知道value存在哪?value是以键值对的形式保存,键值对是以KeyValue
结构体的方式存储在store中,我们需要知道KeyValue
是怎么定义的。
// 直观的想,KeyValue可能是这样 type KeyValue struct{ key string value string } // 但它其实包含更多的东西 type KeyValue struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // 创建时的store版本号 CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"` // 最后一次修改的store版本号 ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"` // KeyValue自身维护的版本号 Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"` // 租约ID Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"` }
这个时候,我们可以理解,每一个KeyValue都对应一个Revision。
如果我们想查询一个KeyValue,可以用Revision做索引
如上图:
foo=bar创建时Revision=1。那我就可以用1
来找到foo=bar
同理:
这么做的意义在哪?在于它保存了KeyValue的所有变化,即使最后foo=bar5。我依然可以找到foo之前的值。这就是mvcc多版本控制的根本所在。
这个时候我们得出了用Revision找到KeyValue。但实际情况我们是用key去找到KeyValue。
那么接下来的问题是怎么用key找到Revision。
store中使用BTree
实现key快速找到Revision。
(BTree具体实现可以阅读github.com/google/btree)
接下来我们就能大概绘出这样一幅轮廓图:
其中Backend是KeyValue的存储,也是我们之后要讨论的。
对应关系如下
当我们想要查key为foo对应的值时,我们可以指定Revision开始查询(如果不指定,默认从最新开始查询)
我们找几处代码来论证以上所述。
首先找到store的Put具体实现。
// store -> storeTxnWrite -> Put -> put func put(...) { ... tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // Backend 添加 Revision->KeyValue tw.s.kvindex.Put(key, idxRev) // BTree Index 添加key->Revision索引 ... }
再看store的Range的具体实现
// store -> storeTxnRead -> Range -> rangeKeys func rangeKeys(...) { ... revpairs := tr.s.kvindex.Revisions(key, end, rev) // 在Index中,用key查找Revision ... _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) // 用Revision在Backend中查找KeyValue ... }
从这两处就能简单证明上面所述。
接下来讨论Backend是怎么实现的。
我们都了解,ETCD的持久化存储是基于BlotDB的,Backend就是对BlotDB封装了一层。Backend在BlotDB上做了一层缓存,缓存最近的数据。
backend结构体是Backend接口的具体实现,我们首先关注Range、Put是怎么实现的
// backend -> ReadTx -> UnsafeRange func UnsafeRange(...){ ... keys, vals := rt.buf.Range(bucketName, key, endKey, limit) // 从缓存中查找 if int64(len(keys)) == limit { return keys, vals // 如果缓存中有全部数据,直接返回,不走DB } ... k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) // 从DB中查找 } // backend -> batchTxBuffered -> UnsafePut func UnsafePut(...) { t.batchTx.UnsafePut(bucketName, key, value) // 数据保存到DB t.buf.put(bucketName, key, value) // 数据保存到缓存中 }
那么查询一个Key的流程就是这样的:
这里有一个细节,Backend的缓存中是怎么查找的呢?缓存的结构如下
type bucketBuffer struct { buf []kv used int }
数据是保存在一个数组中,每次查找都需要遍历数组吗?不是的
每次写事务提交后都会将本次写操作的缓存merge到读缓存上
func merge() { ... sort.Stable(bb) // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { widx++ } bb.buf[widx] = bb.buf[ridx] } bb.used = widx + 1 }
merge会将所有key排序,并且去重。也就是说缓存中的key始终是有序的。
所以查找的时候就可以用二分法了。
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 } idx := sort.Search(bb.used, f) if idx < 0 { return nil, nil } if len(endKey) == 0 { if bytes.Equal(key, bb.buf[idx].key) { keys = append(keys, bb.buf[idx].key) vals = append(vals, bb.buf[idx].val) } return keys, vals } if bytes.Compare(endKey, bb.buf[idx].key) <= 0 { return nil, nil } for i := idx; i < bb.used && int64(len(keys)) < limit; i++ { if bytes.Compare(endKey, bb.buf[i].key) <= 0 { break } keys = append(keys, bb.buf[i].key) vals = append(vals, bb.buf[i].val) } return keys, vals }
查找先用二分法找到key所在的id,然后从id开始遍历,找到key~endKey之间的数据。
现在已经介绍了store中的存储用的数据结构。大致结构如下
Range时
Put时