本文主要研究一下cortex的Distributor
cortex/pkg/distributor/distributor.go
// Distributor is a storage.SampleAppender and a client.Querier which // forwards appends and queries to individual ingesters. type Distributor struct { services.Service cfg Config ingestersRing ring.ReadRing ingesterPool *ring_client.Pool limits *validation.Overrides // The global rate limiter requires a distributors ring to count // the number of healthy instances distributorsRing *ring.Lifecycler // For handling HA replicas. HATracker *haTracker // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter // Manager for subservices (HA Tracker, distributor ring and client pool) subservices *services.Manager subservicesWatcher *services.FailureWatcher }
Distributor用于转发、追加、查询ingesters
cortex/pkg/distributor/distributor.go
// Push implements client.IngesterServer func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } source := util.GetSourceIPsFromOutgoingCtx(ctx) var firstPartialErr error removeReplica := false numSamples := 0 for _, ts := range req.Timeseries { numSamples += len(ts.Samples) } // Count the total samples in, prior to validation or deduplication, for comparison with other metrics. incomingSamples.WithLabelValues(userID).Add(float64(numSamples)) // Count the total number of metadata in. incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) // A WriteRequest can only contain series or metadata but not both. This might change in the future. // For each timeseries or samples, we compute a hash to distribute across ingesters; // check each sample/metadata and discard if outside limits. validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries)) validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata)) metadataKeys := make([]uint32, 0, len(req.Metadata)) seriesKeys := make([]uint32, 0, len(req.Timeseries)) validatedSamples := 0 if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 { cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels) removeReplica, err = d.checkSample(ctx, userID, cluster, replica) if err != nil { // Ensure the request slice is reused if the series get deduped. client.ReuseSlice(req.Timeseries) if errors.Is(err, replicasNotMatchError{}) { // These samples have been deduped. dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples)) return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error()) } if errors.Is(err, tooManyClustersError{}) { validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples)) return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } return nil, err } // If there wasn't an error but removeReplica is false that means we didn't find both HA labels. if !removeReplica { nonHASamples.WithLabelValues(userID).Add(float64(numSamples)) } } latestSampleTimestampMs := int64(0) defer func() { // Update this metric even in case of errors. if latestSampleTimestampMs > 0 { latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000) } }() // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. for _, ts := range req.Timeseries { // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong. if len(ts.Samples) > 0 { latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs) } if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...) ts.Labels = client.FromLabelsToLabelAdapters(l) } // If we found both the cluster and replica labels, we only want to include the cluster label when // storing series in Cortex. If we kept the replica label we would end up with another series for the same // series we're trying to dedupe when HA tracking moves over to a different replica. if removeReplica { removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) } for _, labelName := range d.limits.DropLabels(userID) { removeLabel(labelName, &ts.Labels) } if len(ts.Labels) == 0 { continue } // We rely on sorted labels in different places: // 1) When computing token for labels, and sharding by all labels. Here different order of labels returns // different tokens, which is bad. // 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected // later in the validation phase, we ignore them here. sortLabelsIfNeeded(ts.Labels) // Generate the sharding token based on the series labels without the HA replica // label and dropped labels (if any) key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { return nil, err } validatedSeries, err := d.validateSeries(ts, userID) // Errors in validation are considered non-fatal, as one series in a request may contain // invalid data but all the remaining series could be perfectly valid. if err != nil && firstPartialErr == nil { firstPartialErr = err } // validateSeries would have returned an emptyPreallocSeries if there were no valid samples. if validatedSeries == emptyPreallocSeries { continue } seriesKeys = append(seriesKeys, key) validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedSamples += len(ts.Samples) } for _, m := range req.Metadata { err := validation.ValidateMetadata(d.limits, userID, m) if err != nil { if firstPartialErr == nil { firstPartialErr = err } continue } metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName)) validatedMetadata = append(validatedMetadata, m) } receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) if len(seriesKeys) == 0 && len(metadataKeys) == 0 { // Ensure the request slice is reused if there's no series or metadata passing the validation. client.ReuseSlice(req.Timeseries) return &client.WriteResponse{}, firstPartialErr } now := time.Now() totalN := validatedSamples + len(validatedMetadata) if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { // Ensure the request slice is reused if the request is rate limited. client.ReuseSlice(req.Timeseries) // Return a 4xx here to have the client discard the data and not retry. If a client // is sending too much data consistently we will unlikely ever catch up otherwise. validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples)) validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata)) } subRing := d.ingestersRing // Obtain a subring if required. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID)) } keys := append(seriesKeys, metadataKeys...) initialMetadataIndex := len(seriesKeys) op := ring.WriteNoExtend if d.cfg.ExtendWrites { op = ring.Write } err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error { timeseries := make([]client.PreallocTimeseries, 0, len(indexes)) var metadata []*client.MetricMetadata for _, i := range indexes { if i >= initialMetadataIndex { metadata = append(metadata, validatedMetadata[i-initialMetadataIndex]) } else { timeseries = append(timeseries, validatedTimeseries[i]) } } // Use a background context to make sure all ingesters get samples even if we return early localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) defer cancel() localCtx = user.InjectOrgID(localCtx, userID) if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } // Get clientIP(s) from Context and add it to localCtx localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source) return d.send(localCtx, ingester, timeseries, metadata, req.Source) }, func() { client.ReuseSlice(req.Timeseries) }) if err != nil { return nil, err } return &client.WriteResponse{}, firstPartialErr }
Push方法在d.cfg.ShardingStrategy为util.ShardingStrategyShuffle时,会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys,其callback函数执行d.send(localCtx, ingester, timeseries, metadata, req.Source)
cortex/pkg/ring/batch.go
// DoBatch request against a set of keys in the ring, handling replication and // failures. For example if we want to write N items where they may all // hit different ingesters, and we want them all replicated R ways with // quorum writes, we track the relationship between batch RPCs and the items // within them. // // Callback is passed the ingester to target, and the indexes of the keys // to send to that ingester. // // Not implemented as a method on Ring so we can test separately. func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error { if r.IngesterCount() <= 0 { return fmt.Errorf("DoBatch: IngesterCount <= 0") } expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount() itemTrackers := make([]itemTracker, len(keys)) ingesters := make(map[string]ingester, r.IngesterCount()) var ( bufDescs [GetBufferSize]IngesterDesc bufHosts [GetBufferSize]string bufZones [GetBufferSize]string ) for i, key := range keys { replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0]) if err != nil { return err } itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors itemTrackers[i].maxFailures = replicationSet.MaxErrors for _, desc := range replicationSet.Ingesters { curr, found := ingesters[desc.Addr] if !found { curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers) curr.indexes = make([]int, 0, expectedTrackers) } ingesters[desc.Addr] = ingester{ desc: desc, itemTrackers: append(curr.itemTrackers, &itemTrackers[i]), indexes: append(curr.indexes, i), } } } tracker := batchTracker{ done: make(chan struct{}, 1), err: make(chan error, 1), } tracker.rpcsPending.Store(int32(len(itemTrackers))) var wg sync.WaitGroup wg.Add(len(ingesters)) for _, i := range ingesters { go func(i ingester) { err := callback(i.desc, i.indexes) tracker.record(i.itemTrackers, err) wg.Done() }(i) } // Perform cleanup at the end. go func() { wg.Wait() cleanup() }() select { case err := <-tracker.err: return err case <-tracker.done: return nil case <-ctx.Done(): return ctx.Err() } }
DoBatch方法提供了callback函数用于处理ingester及indexes
cortex/pkg/distributor/query.go
// Query multiple ingesters and returns a Matrix of samples. func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { var matrix model.Matrix err := instrument.CollectedRequest(ctx, "Distributor.Query", queryDuration, instrument.ErrorCode, func(ctx context.Context) error { req, err := ingester_client.ToQueryRequest(from, to, matchers) if err != nil { return err } replicationSet, err := d.GetIngestersForQuery(ctx, matchers...) if err != nil { return err } matrix, err = d.queryIngesters(ctx, replicationSet, req) if err != nil { return err } if s := opentracing.SpanFromContext(ctx); s != nil { s.LogKV("series", len(matrix)) } return nil }) return matrix, err }
Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix
cortex的Distributor提供了Push、Query方法;Push方法会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys;Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix。