通过流程顺序,异步一句解析
直接看代码
//通过Import导入一个Selector @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { //.......省略 } #导入配置类ProxyAsyncConfiguration public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, * respectively. */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } } //通过配置类导入BeanPostProcess @Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
核心方法
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (this.advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //暂时忽略 if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } //最后我们详细讲解 //判断Bean是否有资格创建代理 //通过Advisor的Pointcut判断bean是否有资格,此处可以简单理解为判断Class或Class中的Method是否标记@Async(满足一个即可) if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } //创建代理对象需要的Advisor(最中方法的异步执行都在这个类中) 最后我们详细讲解 proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); // Use original ClassLoader if bean class not locally loaded in overriding class loader ClassLoader classLoader = getProxyClassLoader(); if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) { classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader(); } //创建代理对象返回 return proxyFactory.getProxy(classLoader); } // No proxy needed. return bean; }
@Async标记的方法是怎么异步执行的,又是怎么可以返回结果的
核心代码
public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } //创建线程任务 Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; //传递任务,Executor,和方法的返回值。 @Async是否需要返回值就是根据方法的返回值类型判断的 return doSubmit(task, executor, invocation.getMethod().getReturnType()); } protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { //判断返回值类型,执行方式不同 if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }
后续会不上AOP章节
前面我们提到了几个点,怎么判断@Async,为什么会最终AnnotationAsyncExecutionInterceptor执行的。 这些需要又AOP的前置知识,不然会一头雾水。
基础知识虽然很枯燥,但是很重要,是一切实现的基础。
通过AopUtils判断,此处传入了一个advisor.
Advisor接口整合了Adivce和Pointcut。
最终执行方法的AnnotationAsyncExecutionInterceptor就是一个Advice。就是在当前的this.advisor对象中存储。
Pointcut是在Aop中实现判断的,整合CalssFilter和MethodMatcher.
protected boolean isEligible(Class<?> targetClass) { Boolean eligible = this.eligibleBeans.get(targetClass); if (eligible != null) { return eligible; } if (this.advisor == null) { return false; } //此处判断 eligible = AopUtils.canApply(this.advisor, targetClass); this.eligibleBeans.put(targetClass, eligible); return eligible; }
判断方法
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) { Assert.notNull(pc, "Pointcut must not be null"); if (!pc.getClassFilter().matches(targetClass)) { return false; } //通过Pointcut判断. 此处使用ClassFilterAwareUnionMethodMatcher MethodMatcher methodMatcher = pc.getMethodMatcher(); if (methodMatcher == MethodMatcher.TRUE) { // No need to iterate the methods if we're matching any method anyway... return true; } IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null; if (methodMatcher instanceof IntroductionAwareMethodMatcher) { introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher; } Set<Class<?>> classes = new LinkedHashSet<>(); if (!Proxy.isProxyClass(targetClass)) { classes.add(ClassUtils.getUserClass(targetClass)); } classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass)); for (Class<?> clazz : classes) { Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz); for (Method method : methods) { if (introductionAwareMethodMatcher != null ? introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) : //此处判断 methodMatcher.matches(method, targetClass)) { return true; } } } return false; }
最终判断的MethodMatcher是AnnotationMethodMatcher
判断Method上是否携带annotationType注解 那annotationType会不会是@Async呢? 答案是肯定的
public class AnnotationMethodMatcher extends StaticMethodMatcher { private final Class<? extends Annotation> annotationType; //判断Method上是否携带annotationType注解 那annotationType会不会是@Async呢? 答案是肯定的 private boolean matchesMethod(Method method) { return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(method, this.annotationType) : method.isAnnotationPresent(this.annotationType)); } }
线索还需要通过AsyncAnnotationBeanPostProcessor 找,我们刚才说了Advisor整合了Adive和Pointcu,Pointcut又整合了CalssFilter和MethodMatcher. 最终是通过MethodMatcher处理的。 我们从之前的源码得到信息AsyncAnnotationBeanPostProcessor 中Advisor的属性属性,我们通过找advisor的复制地方下手
AsyncAnnotationBeanPostProcessor 是一个BeanFactoryAware,在bean生命周期回调setBeanFactory时复制,通过AsyncAnnotationAdvisor构造器,对Adivce和Pointcut初始化,Pointcut中传递了的Annotation就是@Async,Adice就是最开始我们说的AnnotationAsyncExecutionInterceptor
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory);, AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; } public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); //添加@Async注解 asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); } //此处通过ComposablePointcut通过union组合,(union满足一个即可) protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); } protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { //创建Advice,就是我们最开始说的最终方法异步执行的Advice: AnnotationAsyncExecutionInterceptor AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }