本文主要研究一下dubbo-go-proxy的timeoutFilter
dubbo-go-proxy/pkg/filter/timeout/timeout.go
func Init() { extension.SetFilterFunc(constant.TimeoutFilter, timeoutFilterFunc(0)) } func timeoutFilterFunc(duration time.Duration) fc.FilterFunc { return New(duration).Do() } // timeoutFilter is a filter for control request time out. type timeoutFilter struct { // global timeout waitTime time.Duration } // New create timeout filter. func New(t time.Duration) filter.Filter { if t <= 0 { t = constant.DefaultTimeout } return &timeoutFilter{ waitTime: t, } }
timeoutFilter往extension设置了名为`dgp.filters.timeout的timeoutFilterFunc,默认的timeout为1s;timeoutFilterFunc执行的是timeoutFilter的Do方法
dubbo-go-proxy/pkg/filter/timeout/timeout.go
// Do execute timeoutFilter filter logic. func (f timeoutFilter) Do() fc.FilterFunc { return func(c fc.Context) { hc := c.(*contexthttp.HttpContext) ctx, cancel := context.WithTimeout(hc.Ctx, f.getTimeout(hc.Timeout)) defer cancel() // Channel capacity must be greater than 0. // Otherwise, if the parent coroutine quit due to timeout, // the child coroutine may never be able to quit. finishChan := make(chan struct{}, 1) go func() { // panic by recovery c.Next() finishChan <- struct{}{} }() select { // timeout do. case <-ctx.Done(): logger.Warnf("api:%s request timeout", hc.GetAPI().URLPattern) bt, _ := json.Marshal(filter.ErrResponse{Message: http.ErrHandlerTimeout.Error()}) hc.SourceResp = bt hc.TargetResp = &client.Response{Data: bt} hc.WriteJSONWithStatus(http.StatusGatewayTimeout, bt) c.Abort() case <-finishChan: // finish call do something. } } } func (f timeoutFilter) getTimeout(t time.Duration) time.Duration { if t <= 0 { return f.waitTime } return t }
Do方法会通过context.WithTimeout创建ctx及cancel,然后注册defer这个cancel func;之后创建finishChan,异步执行c.Next(),之后往finishChan写入数据;最后select判断ctx.Done()或者是finishChan;如果是ctx.Done()则表示timeout了,返回http.StatusGatewayTimeout
dubbo-go-proxy的timeoutFilter默认设置了1s超时,它通过context.WithTimeout创建ctx及cancel,并异步执行c.Next(),若ctx.Done()则表示timeout了,返回http.StatusGatewayTimeout;若是读取到了finishChan则表示请求正常响应。