hystrix-go熔断框架源码分析理解(一)对大致结构有了一些理解
metricExchange中的Updates其实就是每一个上报的事件,m.Updates = make(chan *commandExecution, 2000),上文已讲了对它的消费处理,那么他在那里进行更新的呢?
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error { if len(eventTypes) == 0 { return fmt.Errorf("no event types sent for metrics") } circuit.mutex.RLock() o := circuit.open circuit.mutex.RUnlock() if eventTypes[0] == "success" && o { circuit.setClose() } var concurrencyInUse float64 if circuit.executorPool.Max > 0 { concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max) } select { case circuit.metrics.Updates <- &commandExecution{ Types: eventTypes, Start: start, RunDuration: runDuration, ConcurrencyInUse: concurrencyInUse, }: default: return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)} } return nil }
无论成功还是失败事件均会调用这个方法,其中circuit.metrics.Updates <- &commandExecution就是不断对channel写入值(问题三已解决),这段逻辑if eventTypes[0] == "success" && o就是对熔断器的的关闭事件,但在上文中关闭的这个逻辑是有一段停顿事时间,那么这段停顿时间是在哪里实现的呢?在cmd.circuit.AllowRequest()这段逻辑呢,这段逻辑既需要开启熔断器,也包含了停顿的处理:
func (circuit *CircuitBreaker) AllowRequest() bool { return !circuit.IsOpen() || circuit.allowSingleTest() } func (circuit *CircuitBreaker) allowSingleTest() bool { circuit.mutex.RLock() defer circuit.mutex.RUnlock() now := time.Now().UnixNano() openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime) if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() { swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) if swapped { log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name) } return swapped } return false } func (circuit *CircuitBreaker) IsOpen() bool { circuit.mutex.RLock() o := circuit.forceOpen || circuit.open circuit.mutex.RUnlock() if o { return true } if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold { return false } #IsHealthy就是判断当前的错误率是否达标了,达标就开启熔断器 if !circuit.metrics.IsHealthy(time.Now()) { // too many failures, open the circuit circuit.setOpen() return true } return false }
其中allowSingleTest方法就是在对停顿重试时间进行处理,通过与上次熔断器开始时间进行比较来实现这个功能,IsOpen里就是如果一个统计窗口10秒内请求数量达到目标值,开始计算错误率是否达标,如果错误率达标就开启熔断器
现在只剩下问题5:令牌的逻辑?其实就是用来限流的,这是熔断器附带的一个功能,在熔断器初始化的时候,会给一个令牌最大值,之后同时并发量是不能超过这个值的,超过这个值会直接走熔断逻辑了,也可以在Goc的方法中发现每个调用结束以后都会有返回令牌的逻辑,其中令牌是在executorPool中
executorPool的初始化过程如下:Max就是最大令牌数量
func newExecutorPool(name string) *executorPool { p := &executorPool{} p.Name = name p.Metrics = newPoolMetrics(name) p.Max = getSettings(name).MaxConcurrentRequests p.Tickets = make(chan *struct{}, p.Max) for i := 0; i < p.Max; i++ { p.Tickets <- &struct{}{} } return p }
返回令牌逻辑:
func (p *executorPool) Return(ticket *struct{}) { if ticket == nil { return } p.Metrics.Updates <- poolMetricsUpdate{ activeCount: p.ActiveCount(), } p.Tickets <- ticket }