本文主要研究一下cortex的Ingester
cortex/pkg/api/api.go
// Ingester is defined as an interface to allow for alternative implementations // of ingesters to be passed into the API.RegisterIngester() method. type Ingester interface { client.IngesterServer FlushHandler(http.ResponseWriter, *http.Request) ShutdownHandler(http.ResponseWriter, *http.Request) Push(context.Context, *client.WriteRequest) (*client.WriteResponse, error) }
Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法
cortex/pkg/ingester/client/cortex.pb.go
// IngesterServer is the server API for Ingester service. type IngesterServer interface { Push(context.Context, *WriteRequest) (*WriteResponse, error) Query(context.Context, *QueryRequest) (*QueryResponse, error) QueryStream(*QueryRequest, Ingester_QueryStreamServer) error LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error) LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error) UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error) AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error) MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error) MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error) // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). TransferChunks(Ingester_TransferChunksServer) error }
client.IngesterServer接口定义了Push、Query、QueryStream、LabelValues、LabelNames、UserStats、AllUserStats、MetricsForLabelMatchers、MetricsMetadata、TransferChunks方法
cortex/pkg/ingester/ingester.go
// Push implements client.IngesterServer func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) { if err := i.checkRunningOrStopping(); err != nil { return nil, err } if i.cfg.BlocksStorageEnabled { return i.v2Push(ctx, req) } // NOTE: because we use `unsafe` in deserialisation, we must not // retain anything from `req` past the call to ReuseSlice defer client.ReuseSlice(req.Timeseries) userID, err := tenant.TenantID(ctx) if err != nil { return nil, fmt.Errorf("no user id") } // Given metadata is a best-effort approach, and we don't halt on errors // process it before samples. Otherwise, we risk returning an error before ingestion. i.pushMetadata(ctx, userID, req.GetMetadata()) var firstPartialErr *validationError var record *WALRecord if i.cfg.WALConfig.WALEnabled { record = recordPool.Get().(*WALRecord) record.UserID = userID // Assuming there is not much churn in most cases, there is no use // keeping the record.Labels slice hanging around. record.Series = nil if cap(record.Samples) < len(req.Timeseries) { record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries)) } else { record.Samples = record.Samples[:0] } } for _, ts := range req.Timeseries { seriesSamplesIngested := 0 for _, s := range ts.Samples { // append() copies the memory in `ts.Labels` except on the error path err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record) if err == nil { seriesSamplesIngested++ continue } i.metrics.ingestedSamplesFail.Inc() if ve, ok := err.(*validationError); ok { if firstPartialErr == nil { firstPartialErr = ve } continue } // non-validation error: abandon this request return nil, grpcForwardableError(userID, http.StatusInternalServerError, err) } if i.cfg.ActiveSeriesMetricsEnabled && seriesSamplesIngested > 0 { // updateActiveSeries will copy labels if necessary. i.updateActiveSeries(userID, time.Now(), ts.Labels) } } if record != nil { // Log the record only if there was no error in ingestion. if err := i.wal.Log(record); err != nil { return nil, err } recordPool.Put(record) } if firstPartialErr != nil { // grpcForwardableError turns the error into a string so it no longer references `req` return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr) } return &client.WriteResponse{}, nil }
Push方法首先执行checkRunningOrStopping,若i.cfg.BlocksStorageEnabled则执行i.v2Push(ctx, req);否则遍历req.Timeseries执行i.append
cortex/pkg/ingester/flush.go
// FlushHandler triggers a flush of all in memory chunks. Mainly used for // local testing. func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) { if i.cfg.BlocksStorageEnabled { i.v2FlushHandler(w, r) return } level.Info(util.Logger).Log("msg", "starting to flush all the chunks") i.sweepUsers(true) level.Info(util.Logger).Log("msg", "chunks queued for flushing") w.WriteHeader(http.StatusNoContent) }
FlushHandler方法在i.cfg.BlocksStorageEnabled为true时执行i.v2FlushHandler(w, r)
cortex/pkg/ingester/ingester.go
// ShutdownHandler triggers the following set of operations in order: // * Change the state of ring to stop accepting writes. // * Flush all the chunks. func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { originalFlush := i.lifecycler.FlushOnShutdown() // We want to flush the chunks if transfer fails irrespective of original flag. i.lifecycler.SetFlushOnShutdown(true) // In the case of an HTTP shutdown, we want to unregister no matter what. originalUnregister := i.lifecycler.ShouldUnregisterOnShutdown() i.lifecycler.SetUnregisterOnShutdown(true) _ = services.StopAndAwaitTerminated(context.Background(), i) // Set state back to original. i.lifecycler.SetFlushOnShutdown(originalFlush) i.lifecycler.SetUnregisterOnShutdown(originalUnregister) w.WriteHeader(http.StatusNoContent) }
ShutdownHandler方法执行i.lifecycler.FlushOnShutdown()、i.lifecycler.ShouldUnregisterOnShutdown()以及services.StopAndAwaitTerminated
cortex/pkg/ingester/ingester.go
// Query implements service.IngesterServer func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { if err := i.checkRunningOrStopping(); err != nil { return nil, err } if i.cfg.BlocksStorageEnabled { return i.v2Query(ctx, req) } userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } from, through, matchers, err := client.FromQueryRequest(req) if err != nil { return nil, err } i.metrics.queries.Inc() i.userStatesMtx.RLock() state, ok, err := i.userStates.getViaContext(ctx) i.userStatesMtx.RUnlock() if err != nil { return nil, err } else if !ok { return &client.QueryResponse{}, nil } result := &client.QueryResponse{} numSeries, numSamples := 0, 0 maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID) err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { values, err := series.samplesForRange(from, through) if err != nil { return err } if len(values) == 0 { return nil } numSeries++ numSamples += len(values) if numSamples > maxSamplesPerQuery { return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of samples in a query (%d)", maxSamplesPerQuery) } ts := client.TimeSeries{ Labels: client.FromLabelsToLabelAdapters(series.metric), Samples: make([]client.Sample, 0, len(values)), } for _, s := range values { ts.Samples = append(ts.Samples, client.Sample{ Value: float64(s.Value), TimestampMs: int64(s.Timestamp), }) } result.Timeseries = append(result.Timeseries, ts) return nil }, nil, 0) i.metrics.queriedSeries.Observe(float64(numSeries)) i.metrics.queriedSamples.Observe(float64(numSamples)) return result, err }
Query方法先判断i.checkRunningOrStopping();若i.cfg.BlocksStorageEnabled则执行i.v2Query(ctx, req);否则通过series.samplesForRange(from, through)获取数据
cortex的Ingester接口内嵌了client.IngesterServer,定义了FlushHandler、ShutdownHandler、Push方法;Ingester实现了Ingester接口。