Time Series DBMS are designed to efficiently collect, store and query various time series with high transaction volumes
type sample struct { t int64 v float64 }
type Label struct { Name, Value string }
type Labels []Label
type Vector []Sample
type Scalar struct { T int64 V float64 }
type Series struct { Metric labels.Labels `json:"metric"` Points []Point `json:"values"` }
type Matrix []Series
Delta1 Delta2 Delta3 T1 --------> T2 --------> T3 --------> T4
March 24,2015 02:00:00 | 62 | '10':-2 | '0' | bit length 64 14 9 1 压缩率: (64+14+9+1)/(64*4)=0.34375
// memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { sync.RWMutex ref uint64 lset labels.Labels mmappedChunks []*mmappedChunk headChunk *memChunk chunkRange int64 firstChunkID int nextAt int64 // Timestamp at which to cut the next chunk. sampleBuf [4]sample pendingCommit bool // Whether there are samples waiting to be committed to this series. app chunkenc.Appender // Current appender for the chunk. memChunkPool *sync.Pool txs *txRing }
其中最重要的三个字段
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { // Just using `getOrSet` below would be semantically sufficient, but we'd create // a new series on every sample inserted via Add(), which causes allocations // and makes our series IDs rather random and harder to compress in postings. s := h.series.getByHash(hash, lset) if s != nil { return s, false, nil } // Optimistically assume that we are the first one to create the series. id := atomic.AddUint64(&h.lastSeriesID, 1) return h.getOrCreateWithID(id, hash, lset) }
type memChunk struct { chunk chunkenc.Chunk minTime, maxTime int64 }
series
呢?哈希表显然是最佳的方案。基于label计算一个哈希值,维护一张哈希值与memSeries
的映射表,如果产生哈希碰撞的话,则直接用label进行匹配。因此,Prometheus有必要在内存中维护如下所示的两张哈希表,从而无论利用ref
还是label
都能很快找到对应的memSeries
{ series map[uint64]*memSeries // ref到memSeries的映射 hashes map[uint64][]*memSeries // labels的哈希值到memSeries的映射 }
memSeries
的增删操作,如果在读写上述结构时简单地用一把大锁锁住,显然无法满足性能要求const ( // DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map. DefaultStripeSize = 1 << 14 ) // stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. // The locks are padded to not be on the same cache line. Filling the padded space // with the maps was profiled to be slower – likely due to the additional pointer // dereferences. type stripeSeries struct { size int series []map[uint64]*memSeries hashes []seriesHashmap locks []stripeLock seriesLifecycleCallback SeriesLifecycleCallback } type stripeLock struct { sync.RWMutex // Padding to avoid multiple locks being on the same cache line. _ [40]byte }
初始化head的时候 生成16384个小哈希表,如果想根据ref找到memSeries
只需要把ref
对16384取模找到对应的series[x],只需要lock[x],从而大大降低了读写memSeries
时对锁的抢占造成的消耗,提升读写吞吐量
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { i := hash & uint64(s.size-1) s.locks[i].RLock() series := s.hashes[i].get(hash, lset) s.locks[i].RUnlock() return series }
注意看这里 取模的操作使用的是&而不是% 这是因为位运算(&)效率要比取模运算(%)高很多,主要原因是位运算直接对内存数据进行操作,不需要转成十进制,因此处理速度非常快
a % b == a & (b - 1) 前提:b 为 2^n
上面我们看到怎么根据一个metric的全部label 找到对应的memSeries ,但是更通用的是label模糊匹配查询
比如 node_exportor中的cpu指标 node_cpu_seconds_total 指定cpu="1"代表第二个核心,同时还有7种mode,甚至 Prometheus支持在指定label时使用正则表达式:node_cpu_seconds_total{mode=~"user|system"}
面对如此复杂的查询,必须要要引入倒排索引来解决这个问题
// MemPostings holds postings list for series ID per label pair. They may be written // to out of order. // ensureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { mtx sync.RWMutex m map[string]map[string][]uint64 ordered bool }
假设prometheus抓取到一个新的series node_cpu_seconds_total{mode="user",cpu="0",instance="1.1.1.1:9100"} 他的ref为x
在初始化memSeries后,更新哈希表后,还需要对倒排索引进行更新
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool) s, created, err := h.series.getOrSet(hash, s) if err != nil { return nil, false, err } if !created { return s, false, nil } h.metrics.seriesCreated.Inc() atomic.AddUint64(&h.numSeries, 1) h.postings.Add(id, lset) ...... }
MemPostings.m["__name__"]["node_cpu_seconds_total"]={..,x,..} MemPostings.m["mode"]["user"]={..,x,..} MemPostings.m["cpu"]["0"]={..,x,..} MemPostings.m["instance"]["1.1.1.1:9100"]={..,x,..}
func (p *MemPostings) addFor(id uint64, l labels.Label) { nm, ok := p.m[l.Name] if !ok { nm = map[string][]uint64{} p.m[l.Name] = nm } list := append(nm[l.Value], id) nm[l.Value] = list if !p.ordered { return } // There is no guarantee that no higher ID was inserted before as they may // be generated independently before adding them to postings. // We repair order violations on insert. The invariant is that the first n-1 // items in the list are already sorted. for i := len(list) - 1; i >= 1; i-- { if list[i] >= list[i-1] { break } list[i], list[i-1] = list[i-1], list[i] } }
假设 MemPostings.m["__name__"]["node_cpu_seconds_total"]={1,2,3,5,7,8} MemPostings.m["mode"]["user"]={10,2,3,4,6,8} 求 node_cpu_seconds_total{mode="user"}即是 求交集--> {1,2,3,5,7,8} & {10,2,3,4,6,8} = {2,3,8}
但是如果每个label pair包含的series
足够多,那么对多个label pair的series
做交集也将是非常耗时的操作。那么能不能进一步优化呢?事实上,只要保持每个label pair里包含的series有序就可以了,这样就能将复杂度从指数级瞬间下降到线性级。
下面是prometheus data目录的情况
drwxr-xr-x 3 root root 4096 Jul 14 23:01 01ED6XF97X5B5RKJMB6E6CRG85 drwxr-xr-x 3 root root 4096 Jul 15 05:00 01ED7J1R0E4BZ398TE48CC0WJQ drwxr-xr-x 3 root root 4096 Jul 15 11:00 01ED86M8WZ4J66F4M842WDWC69 drwxr-xr-x 3 root root 4096 Jul 15 11:00 01ED86N02VPNDEPS694CEZY2DP drwxr-xr-x 2 root root 4096 Jul 15 11:00 chunks_head -rw-r--r-- 1 root root 0 May 20 18:29 lock -rw-r--r-- 1 root root 100001 Jul 15 11:30 queries.active drwxr-xr-x 3 root root 4096 Jul 15 11:30 wal
Prometheus在将采集到的数据真正写入内存之前,会首先存入WAL(Write Ahead Log)
中。因为WAL
是存放在磁盘中的,相当于对内存中的监控数据做了一个完全的备份,即使Prometheus崩溃这部分的数据也不至于丢失。当Prometheus重启之后,它首先会将WAL
的内容加载到内存中,从而完美恢复到崩溃之前的状态,接着再开始新数据的抓取
# wal目录 -rw-r--r-- 1 root root 128M Jul 15 13:59 00005153 -rw-r--r-- 1 root root 128M Jul 15 14:10 00005154 -rw-r--r-- 1 root root 128M Jul 15 14:20 00005155 -rw-r--r-- 1 root root 128M Jul 15 14:31 00005156 -rw-r--r-- 1 root root 128M Jul 15 14:41 00005157 -rw-r--r-- 1 root root 128M Jul 15 14:52 00005158 -rw-r--r-- 1 root root 96M Jul 15 15:00 00005159 -rw-r--r-- 1 root root 128M Jul 15 15:10 00005160 -rw-r--r-- 1 root root 128M Jul 15 15:21 00005161 -rw-r--r-- 1 root root 128M Jul 15 15:31 00005162 -rw-r--r-- 1 root root 128M Jul 15 15:42 00005163 -rw-r--r-- 1 root root 128M Jul 15 15:52 00005164 -rw-r--r-- 1 root root 128M Jul 15 16:03 00005165 -rw-r--r-- 1 root root 41M Jul 15 16:06 00005166 drwxr-xr-x 2 root root 4.0K Jul 15 15:00 checkpoint.00005152
a.log()
如果出现错误则先回滚不会写入内存,那么这个a.log()
就是向wal中写入func (a *headAppender) Commit() error { if err := a.log(); err != nil { //nolint: errcheck a.Rollback() // Most likely the same error will happen again. return errors.Wrap(err, "write to WAL") } defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.iso.closeAppend(a.appendID) total := len(a.samples) var series *memSeries for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() if !ok { total-- a.head.metrics.outOfOrderSamples.Inc() } if chunkCreated { a.head.metrics.chunks.Inc() a.head.metrics.chunksCreated.Inc() } } a.head.metrics.samplesAppended.Add(float64(total)) a.head.updateMinMaxTime(a.mint, a.maxt) return nil }
期将内存中的数据持久化到磁盘是合理的。每一个Block存储了对应时间窗口内的所有数据,包括所有的series
,samples
以及相关的索引结构。Block目录的详细内容如下:
drwxr-xr-x 2 root root 4.0K Jul 9 17:00 chunks -rw-r--r-- 1 root root 92M Jul 9 17:01 index -rw-r--r-- 1 root root 906 Jul 9 17:01 meta.json -rw-r--r-- 1 root root 9 Jul 9 17:01 tombstones
其中meta.json 包含了当前Block的元数据信息,其内容如下:
{ "ulid": "01ECSCWB8KEP5BVX7R4NAVRV36", "minTime": 1594209600000, "maxTime": 1594274400000, "stats": { "numSamples": 1232542974, "numSeries": 287055, "numChunks": 10363468 }, "compaction": { "level": 3, "sources": [ "01ECQF1KCYM6F6CH40XSPH24GY", "01ECQNXAMYR96DXBNSNTX0CMV2", "01ECQWS1WYZ3CNTMMHY92X65DZ", "01ECR3MS4ZAHZT7ZC63CST0NKA", "01ECRAGGCYXZWBMW1BKQB47MBK", "01ECRHC7MYVKWTBCB1Y9QMDH86", "01ECRR7YWZ0MTVYBGRYHE9N8ZQ", "01ECRZ3P64VF8BF4W4WCSNE05S", "01ECS5ZDCY3C05AMWEJE8N4A6J" ], "parents": [ { "ulid": "01ECR3NCHN36VP7GYF4KSAVZPW", "minTime": 1594209600000, "maxTime": 1594231200000 }, { "ulid": "01ECRR8HW7BCVXWFDDNJN9PCBB", "minTime": 1594231200000, "maxTime": 1594252800000 }, { "ulid": "01ECSCVRGEAPH4VK58HC41E0WJ", "minTime": 1594252800000, "maxTime": 1594274400000 } ] }, "version": 1 }
ulid
:用于识别这个Block的编号,它与Block的目录名一致
minTime
和maxTime
:表示这个Block存储的数据的时间窗口
stats
:表示这个Block包含的sample
, series
以及chunks
数目
compaction
:这个Block的压缩信息,因为随着时间的流逝,多个Block也会压缩合并形成更大的Block。level
字段表示了压缩的次数,刚从内存持久化的Block的level
为1,每被联合压缩一次,子Block的level
就会在父Block的基础上加一,而sources
字段则包含了构成当前这个Block的所有祖先Block的ulid
。事实上,对于level >= 2
的Block,还会有一个parent
字段,包含了它的父Block的ulid
以及时间窗口信息。
chunks
是一个子目录,包含了若干个从000001
开始编号的文件,一般每个文件大小的上限为512M。文件中存储的就是在时间窗口[minTime,maxTime]以内的所有samples
,本质上就是对于内存中符合要求的memChunk
的持久化。index
文件存储了索引相关的内容,虽然持久化后的数据被读取的概率是比较低的,但是依然存在被读取的可能。这样一来,如何尽快地从Block中读取时序数据就显得尤为重要了,而快速读取索引并且基于索引查找时序数据则是加快整体查询效率的关键。为了达到这一目标,存储索引信息的index
文件在设计上就显得比较复杂了。
┌────────────────────────────┬─────────────────────┐ │ magic(0xBAAAD700) <4b> │ version(1) <1 byte> │ ├────────────────────────────┴─────────────────────┤ │ ┌──────────────────────────────────────────────┐ │ │ │ Symbol Table │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Series │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Label Index 1 │ │ │ ├──────────────────────────────────────────────┤ │ │ │ ... │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Label Index N │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Postings 1 │ │ │ ├──────────────────────────────────────────────┤ │ │ │ ... │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Postings N │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Label Index Table │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Postings Table │ │ │ ├──────────────────────────────────────────────┤ │ │ │ TOC │ │ │ └──────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────┘
具体请看 prometheus index 文件