本文主要研究一下cortex的ReadRing
cortex/pkg/ring/ring.go
// ReadRing represents the read interface to the ring. type ReadRing interface { prometheus.Collector // Get returns n (or more) ingesters which form the replicas for the given key. // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet(). Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) // GetAllHealthy returns all healthy instances in the ring, for the given operation. // This function doesn't check if the quorum is honored, so doesn't fail if the number // of unhealthy ingesters is greater than the tolerated max unavailable. GetAllHealthy(op Operation) (ReplicationSet, error) // GetReplicationSetForOperation returns all instances where the input operation should be executed. // The resulting ReplicationSet doesn't necessarily contains all healthy instances // in the ring, but could contain the minimum set of instances required to execute // the input operation. GetReplicationSetForOperation(op Operation) (ReplicationSet, error) ReplicationFactor() int IngesterCount() int // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes // all instances that have been part of the identifier's shard since "now - lookbackPeriod". ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing // HasInstance returns whether the ring contains an instance matching the provided instanceID. HasInstance(instanceID string) bool }
ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法
cortex/pkg/ring/ring.go
// Get returns n (or more) ingesters which form the replicas for the given key. func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { return ReplicationSet{}, ErrEmptyRing } var ( n = r.cfg.ReplicationFactor ingesters = bufDescs[:0] start = searchToken(r.ringTokens, key) iterations = 0 // We use a slice instead of a map because it's faster to search within a // slice than lookup a map for a very low number of items. distinctHosts = bufHosts[:0] distinctZones = bufZones[:0] ) for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ { iterations++ // Wrap i around in the ring. i %= len(r.ringTokens) token := r.ringTokens[i] info, ok := r.ringInstanceByToken[token] if !ok { // This should never happen unless a bug in the ring code. return ReplicationSet{}, ErrInconsistentTokensInfo } // We want n *distinct* ingesters && distinct zones. if util.StringsContain(distinctHosts, info.InstanceID) { continue } // Ignore if the ingesters don't have a zone set. if r.cfg.ZoneAwarenessEnabled && info.Zone != "" { if util.StringsContain(distinctZones, info.Zone) { continue } distinctZones = append(distinctZones, info.Zone) } distinctHosts = append(distinctHosts, info.InstanceID) ingester := r.ringDesc.Ingesters[info.InstanceID] // Check whether the replica set should be extended given we're including // this instance. if op.ShouldExtendReplicaSetOnState(ingester.State) { n++ } ingesters = append(ingesters, ingester) } liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) if err != nil { return ReplicationSet{}, err } return ReplicationSet{ Ingesters: liveIngesters, MaxErrors: maxFailure, }, nil }
Get方法先通过r.ringInstanceByToken[token]获取info,再通过r.ringDesc.Ingesters[info.InstanceID]获取ingester,之后通过r.strategy.Filter过滤出liveIngesters
cortex/pkg/ring/ring.go
// GetAllHealthy implements ReadRing. func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 { return ReplicationSet{}, ErrEmptyRing } now := time.Now() ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters { if r.IsHealthy(&ingester, op, now) { ingesters = append(ingesters, ingester) } } return ReplicationSet{ Ingesters: ingesters, MaxErrors: 0, }, nil }
GetAllHealthy方法遍历r.ringDesc.Ingesters,然后通过r.IsHealthy(&ingester, op, now)提取healthy的ingester
cortex/pkg/ring/ring.go
// GetReplicationSetForOperation implements ReadRing. func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() if r.ringDesc == nil || len(r.ringTokens) == 0 { return ReplicationSet{}, ErrEmptyRing } // Build the initial replication set, excluding unhealthy instances. healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) zoneFailures := make(map[string]struct{}) now := time.Now() for _, ingester := range r.ringDesc.Ingesters { if r.IsHealthy(&ingester, op, now) { healthyInstances = append(healthyInstances, ingester) } else { zoneFailures[ingester.Zone] = struct{}{} } } // Max errors and max unavailable zones are mutually exclusive. We initialise both // to 0 and then we update them whether zone-awareness is enabled or not. maxErrors := 0 maxUnavailableZones := 0 if r.cfg.ZoneAwarenessEnabled { // Given data is replicated to RF different zones, we can tolerate a number of // RF/2 failing zones. However, we need to protect from the case the ring currently // contains instances in a number of zones < RF. numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor) minSuccessZones := (numReplicatedZones / 2) + 1 maxUnavailableZones = minSuccessZones - 1 if len(zoneFailures) > maxUnavailableZones { return ReplicationSet{}, ErrTooManyFailedIngesters } if len(zoneFailures) > 0 { // We remove all instances (even healthy ones) from zones with at least // 1 failing ingester. Due to how replication works when zone-awareness is // enabled (data is replicated to RF different zones), there's no benefit in // querying healthy instances from "failing zones". A zone is considered // failed if there is single error. filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range healthyInstances { if _, ok := zoneFailures[ingester.Zone]; !ok { filteredInstances = append(filteredInstances, ingester) } } healthyInstances = filteredInstances } // Since we removed all instances from zones containing at least 1 failing // instance, we have to decrease the max unavailable zones accordingly. maxUnavailableZones -= len(zoneFailures) } else { // Calculate the number of required ingesters; // ensure we always require at least RF-1 when RF=3. numRequired := len(r.ringDesc.Ingesters) if numRequired < r.cfg.ReplicationFactor { numRequired = r.cfg.ReplicationFactor } // We can tolerate this many failures numRequired -= r.cfg.ReplicationFactor / 2 if len(healthyInstances) < numRequired { return ReplicationSet{}, ErrTooManyFailedIngesters } maxErrors = len(healthyInstances) - numRequired } return ReplicationSet{ Ingesters: healthyInstances, MaxErrors: maxErrors, MaxUnavailableZones: maxUnavailableZones, }, nil }
GetReplicationSetForOperation先提取healthyInstances,然后再根据r.cfg.ZoneAwarenessEnabled进行进一步过滤
cortex/pkg/ring/ring.go
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // Nothing to do if the shard size is not smaller then the actual ring. if size <= 0 || r.IngesterCount() <= size { return r } if cached := r.getCachedShuffledSubring(identifier, size); cached != nil { return cached } result := r.shuffleShard(identifier, size, 0, time.Now()) r.setCachedShuffledSubring(identifier, size, result) return result }
ShuffleShard方法先从r.getCachedShuffledSubring获取,如果为nil则执行r.shuffleShard,再执行r.setCachedShuffledSubring
cortex/pkg/ring/ring.go
// HasInstance returns whether the ring contains an instance matching the provided instanceID. func (r *Ring) HasInstance(instanceID string) bool { r.mtx.RLock() defer r.mtx.RUnlock() instances := r.ringDesc.GetIngesters() _, ok := instances[instanceID] return ok }
HasInstance通过r.ringDesc.GetIngesters()获取instances,在根据instanceID判断是否存在
cortex的ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法。