本文主要研究一下loki的Query
loki/pkg/logql/engine.go
// Query is a LogQL query to be executed. type Query interface { // Exec processes the query. Exec(ctx context.Context) (Result, error) } // Result is the result of a query execution. type Result struct { Data promql_parser.Value Statistics stats.Result }
Query接口定义了Exec方法,返回Result;Result定义了Data、Statistics属性
loki/pkg/logql/engine.go
// Exec Implements `Query`. It handles instrumentation & defers to Eval. func (q *query) Exec(ctx context.Context) (Result, error) { log, ctx := spanlogger.New(ctx, "query.Exec") defer log.Finish() rangeType := GetRangeType(q.params) timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType))) defer timer.ObserveDuration() // records query statistics var statResult stats.Result start := time.Now() ctx = stats.NewContext(ctx) data, err := q.Eval(ctx) statResult = stats.Snapshot(ctx, time.Since(start)) statResult.Log(level.Debug(log)) status := "200" if err != nil { status = "500" if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) { status = "400" } } if q.record { RecordMetrics(ctx, q.params, status, statResult) } return Result{ Data: data, Statistics: statResult, }, err }
Exec方法执行q.Eval(ctx)及stats.Snapshot
loki/pkg/logql/engine.go
func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { ctx, cancel := context.WithTimeout(ctx, q.timeout) defer cancel() expr, err := q.parse(ctx, q.params.Query()) if err != nil { return nil, err } switch e := expr.(type) { case SampleExpr: value, err := q.evalSample(ctx, e) return value, err case LogSelectorExpr: iter, err := q.evaluator.Iterator(ctx, e, q.params) if err != nil { return nil, err } defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close) streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval()) return streams, err default: return nil, errors.New("Unexpected type (%T): cannot evaluate") } }
Eval方法执行q.parse解析为Expr,之后根据Expr的类型做不同处理,如果是SampleExpr类型执行q.evalSample;如果是LogSelectorExpr类型则执行q.evaluator.Iterator
loki/pkg/logql/stats/context.go
func Snapshot(ctx context.Context, execTime time.Duration) Result { // ingester data is decoded from grpc trailers. res := decodeTrailers(ctx) // collect data from store. s, ok := ctx.Value(storeKey).(*StoreData) if ok { res.Store.TotalChunksRef = s.TotalChunksRef res.Store.TotalChunksDownloaded = s.TotalChunksDownloaded res.Store.ChunksDownloadTime = s.ChunksDownloadTime.Seconds() } // collect data from chunks iteration. c, ok := ctx.Value(chunksKey).(*ChunkData) if ok { res.Store.HeadChunkBytes = c.HeadChunkBytes res.Store.HeadChunkLines = c.HeadChunkLines res.Store.DecompressedBytes = c.DecompressedBytes res.Store.DecompressedLines = c.DecompressedLines res.Store.CompressedBytes = c.CompressedBytes res.Store.TotalDuplicates = c.TotalDuplicates } existing, err := GetResult(ctx) if err != nil { res.ComputeSummary(execTime) return res } existing.Merge(res) existing.ComputeSummary(execTime) return *existing }
Snapshot方法从ctx.Value取出StoreData及ChunkData计算res,然后再取出Result,进行Merge及ComputeSummary
loki的Query接口定义了Exec方法,返回Result;Result定义了Data、Statistics属性;query实现了Query接口,其Exec方法执行q.Eval(ctx)及stats.Snapshot。