spring声明式事务让我们从复杂的事务处理中得以脱身,我们可以不再去关注获得、关闭连接、事务提交、和回滚操作;简单来说事务可以做到在发生异常时进行回滚。
spring事务的开关配置是:tx:annotation-driven/ ,全局搜索:"annotation-driven",可以找到类:
public class TxNamespaceHandler extends NamespaceHandlerSupport
@Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); // 程序入口 registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); }
org\springframework\transaction\config\AnnotationDrivenBeanDefinitionParser.java 然后我们关注parse方法
public BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) {// 配置mode属性可以实现通过AOP织入事务 // mode="aspectj" registerTransactionAspect(element, parserContext); } else { // mode="proxy" // 根据配置文件定义工具类的 beanDefinition 并注册 AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; }
找到真正的入口: AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
public static void configureAutoProxyCreator(Element element, ParserContext parserContext) { // 注册 或者升级 InfrastructureAdvisorAutoProxyCreator 它实现了接口 BeanPostProcessor, // (最本质的逻辑是:通过 registry 注册了该后处理器,当从 beanFactory 依据 registory 初始化一个bean的时候,会调用该后处理器,对该 bean 进行事务增强) // 保证 bean 实例化时会调用后处理器的 postProcessBeforeInitialization 方法。 AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element); // beanName ?? String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME; // 判断默认的beanName是否已经解析 if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) { Object eleSource = parserContext.extractSource(element); // Create the TransactionAttributeSource definition. // 创建事务属性 beanDefinition 配置 ?? RootBeanDefinition sourceDef = new RootBeanDefinition( "org.springframework.transaction.annotation.AnnotationTransactionAttributeSource"); sourceDef.setSource(eleSource); sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); // 注册 String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef); // Create the TransactionInterceptor definition. // 拦截器 ?? 实际调用时调用其invoke方法 RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class); interceptorDef.setSource(eleSource); interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registerTransactionManager(element, interceptorDef); interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);// 注冊 // Create the TransactionAttributeSourceAdvisor definition. // 切点的 beanDefinition PointcutAdvisor <- AbstractPointcutAdvisor <- AbstractBeanFactoryPointcutAdvisor <- BeanFactoryTransactionAttributeSourceAdvisor RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class); advisorDef.setSource(eleSource); advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); advisorDef.getPropertyValues().add("adviceBeanName", interceptorName); if (element.hasAttribute("order")) {// 是否配置了order属性 advisorDef.getPropertyValues().add("order", element.getAttribute("order")); } parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);// 注册 CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource); compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName)); compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName)); compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName)); parserContext.registerComponent(compositeDef); } }
上述代码的主要工作是注册了三个类:
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element); public static void registerAutoProxyCreatorIfNecessary( ParserContext parserContext, Element sourceElement) { BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary( parserContext.getRegistry(), parserContext.extractSource(sourceElement)); useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement); registerComponentIfNecessary(beanDefinition, parserContext); } @Nullable public static BeanDefinition registerAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { // 注册或升级 return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source); }
InfrastructureAdvisorAutoProxyCreator.java类图
查看它的类图,很快能找打我们需要重点关注的接口:BeanPostProcessor,看到它,就代表后处理器它来了
public interface BeanPostProcessor { default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } }
InfrastructureAdvisorAutoProxyCreator 它实现了接口 BeanPostProcessor,保证 bean 初始化时会调用后处理器中定义的方法。
(最本质的逻辑是:如果在容器组件中注册了该后处理器,当从 beanFactory 初始化一个bean的时候,就会调用该后处理器,对需要初始化 bean 进行事务增强)
BeanFactoryTransactionAttributeSourceAdvisor.java 类图
以它为根基通过AOP增强的方式最终实现事务功能,它以beanName:org.springframework.transaction.config.internalTransactionAdvisor 注册到了容器组件中。
AnnotationTransactionAttributeSource.java类图
从它的类图我们可以找到,它实现了接口:MethodInterceptor,看到这个接口我们就该想到方法 invoke();
注册的三个类中,第一个登上舞台的是类 InfrastructureAdvisorAutoProxyCreator
跟随类 InfrastructureAdvisorAutoProxyCreator的继承体系解构,在它上游的类:AbstractAutoProxyCreator 中发现了如下的定义:
很容发现了我们的老朋友:wrapIfNecessary() 自打学习 AOP 我们就认识了它
// 初始化后 后处理器<增强> public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) throws BeansException { if (bean != null) { Object cacheKey = getCacheKey(bean.getClass(), beanName); if (this.earlyProxyReferences.remove(cacheKey) != bean) {// 避免循环依赖,提前暴露 ?? // 如果需要增强 则需要封装指定的 bean (代理该bean) return wrapIfNecessary(bean, beanName, cacheKey); } } return bean; }
进入该方法的逻辑:
// 要么返回bean 要么返回被代理后的bean-proxy protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) { return bean;// 已经处理过 返回 } // advisedBeans : 需要被增强的 if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) { return bean;// 不需要增强返回 } // 不需要被代理的类:基础类,或者设置了不需代理的类 if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) { this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; } // Create proxy if we have advice. // 获取所有需要被增强的方法 ?? 获取增强切点,(且适用) Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null); if (specificInterceptors != DO_NOT_PROXY) {// 存在增强方法,则创建代理 this.advisedBeans.put(cacheKey, Boolean.TRUE); // 对获取到的需要增强的方法进行代理 Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));// 单例 this.proxyTypes.put(cacheKey, proxy.getClass()); return proxy; } this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; }
我们在类AbstractAdvisorAutoProxyCreator:中找到了获取增强方法的定义
protected Object[] getAdvicesAndAdvisorsForBean( Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) { // 根据 Class 以及 beanName 获取增强,(且适用) List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName); if (advisors.isEmpty()) { return DO_NOT_PROXY; } return advisors.toArray(); }
在大体实现上根AOP 基本类似:
protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) { // AnnotationAwareAspectJAutoProxyCreator 类覆盖了该方法 List<Advisor> candidateAdvisors = findCandidateAdvisors();// 获取所有的增强 (所有 拦截||切点???) List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);// 寻找适用于当前 Bean 的增强 extendAdvisors(eligibleAdvisors); if (!eligibleAdvisors.isEmpty()) { eligibleAdvisors = sortAdvisors(eligibleAdvisors); } return eligibleAdvisors; }
在获取所有增强的 findCandidateAdvisors() 方法内部找到了如下代码:
BeanFactoryUtils.beanNamesForTypeIncludingAncestors(this.beanFactory, Advisor.class, true, false);
我们可看到,它提取了所有的实现了 Advisor 接口的类,到这里我们会议一下,解析事务开关标签的时候,是不是注册了三个类,其中有两个类就注册到了:BeanFactoryTransactionAttributeSourceAdvisor
而它的继承体系中刚好就有:Advisor,自此我们注册的另外两个类将会通过 BeanFactoryTransactionAttributeSourceAdvisor 一一登录舞台。
跟随方法 findAdvisorsThatCanApply()以及参数candidateAdvisors,找到如下方法:
public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) { if (candidateAdvisors.isEmpty()) { return candidateAdvisors; } List<Advisor> eligibleAdvisors = new ArrayList<>(); // 处理引介增强 for (Advisor candidate : candidateAdvisors) { if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) { eligibleAdvisors.add(candidate);// 适合 ?? 拦截条件成立?? } } boolean hasIntroductions = !eligibleAdvisors.isEmpty(); for (Advisor candidate : candidateAdvisors) { // 跳过引介增强(已处理) if (candidate instanceof IntroductionAdvisor) { // already processed continue; } // 普通 bean 处理 if (canApply(candidate, clazz, hasIntroductions)) { eligibleAdvisors.add(candidate); } } return eligibleAdvisors; }
根据 BeanFactoryTransactionAttributeSourceAdvisor 的类图我们可以很明确知道,它的继承结构中并没有:IntroductionAdvisor,那么进入下面的逻辑:
canApply() 方法判断增强是否适用当前方法:
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) { if (advisor instanceof IntroductionAdvisor) { return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass); } else if (advisor instanceof PointcutAdvisor) { PointcutAdvisor pca = (PointcutAdvisor) advisor; return canApply(pca.getPointcut(), targetClass, hasIntroductions); } else { // It doesn't have a pointcut so we assume it applies. return true; } }
很明显 BeanFactoryTransactionAttributeSourceAdvisor 继承自:PointcutAdvisor
else if (advisor instanceof PointcutAdvisor) { PointcutAdvisor pca = (PointcutAdvisor) advisor; return canApply(pca.getPointcut(), targetClass, hasIntroductions); }
跟随上述几行代码,我在类 BeanFactoryTransactionAttributeSourceAdvisor 类中找到了getPointcut() 方法:
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override @Nullable protected TransactionAttributeSource getTransactionAttributeSource() { return transactionAttributeSource; } };
定睛一看:你说巧不巧,它返回的:TransactionAttributeSourcePointcut,实现了一个回调 getTransactionAttributeSource()方法,
它返回的就是 transactionAttributeSource。回到开始,解析事务标签注册了三个类,两个被注册到:BeanFactoryTransactionAttributeSourceAdvisor
transactionAttributeSource 就是那二者之一,它对应的bean就是:AnnotationTransactionAttributeSource。
继续深入方法:canApply(pca.getPointcut(), targetClass, hasIntroductions);
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) { .......... MethodMatcher methodMatcher = pc.getMethodMatcher(); IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null; if (methodMatcher instanceof IntroductionAwareMethodMatcher) { introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher; } ............... for (Class<?> clazz : classes) { Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz); for (Method method : methods) { if (introductionAwareMethodMatcher != null ? introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) : methodMatcher.matches(method, targetClass)) { return true; } } } return false; }
经过上文,我们得知:pc的实际类型是: TransactionAttributeSourcePointcut
TransactionAttributeSourcePointcut.java类图
跟随它的类图,我们在类:StaticMethodMatcherPointcut 中找到了 getMethodMatcher() 方法的实现
@Override public final MethodMatcher getMethodMatcher() { return this; }
根据它的类型我么可以确定表达式:
introductionAwareMethodMatcher != null ? introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) : methodMatcher.matches(method, targetClass)
最终执行的是后者:methodMatcher.matches(method, targetClass)。
类 TransactionAttributeSourcePointcut中找到了 matches的实现:
@Override public boolean matches(Method method, @Nullable Class<?> targetClass) { if (targetClass != null && TransactionalProxy.class.isAssignableFrom(targetClass)) { return false; } // 注册的类定义 transactionAttributeSource AnnotationTransactionAttributeSource TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.getTransactionAttribute(method, targetClass) != null); }
这里我们见到了回调方法:getTransactionAttributeSource(), 最终:tas 的实际类型为:AnnotationTransactionAttributeSource,
表达式 methodMatcher.matches(method, targetClass) 最终判定的是:
(tas == null || tas.getTransactionAttribute(method, targetClass) != null)
跟随 AnnotationTransactionAttributeSource.getTransactionAttribute 继续往下,
在父类AbstractFallbackTransactionAttributeSource中找到方法:computeTransactionAttribute() ,至此我们终于看到了提取事务声明的方法:
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // 查看方法中是否存在事务声明 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // 查看方法所在类是否存在事务声明 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } // specificMethod != method 代表存在接口,去接口中找 if (specificMethod != method) { // 查找接口实现方法 ?? txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // 查找接口实现类 ?? txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; }
最后在 AnnotationTransactionAttributeSource 类中找到 findTransactionAttribute() 方法的定义:
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) { return determineTransactionAttribute(clazz); } protected TransactionAttribute findTransactionAttribute(Method method) { return determineTransactionAttribute(method); } protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) { // annotationParsers (SpringTransactionAnnotationParser) for (TransactionAnnotationParser annotationParser : this.annotationParsers) { TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element); if (attr != null) { return attr; } } return null; }
走到此处,你可能懵逼了:this.annotationParsers是个啥,我没有注册过它啊? 查看类 AnnotationTransactionAttributeSource 的构造器:
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) { this.publicMethodsOnly = publicMethodsOnly; if (jta12Present || ejb3Present) { this.annotationParsers = new LinkedHashSet<>(4); this.annotationParsers.add(new SpringTransactionAnnotationParser()); if (jta12Present) { this.annotationParsers.add(new JtaTransactionAnnotationParser()); } if (ejb3Present) { this.annotationParsers.add(new Ejb3TransactionAnnotationParser()); } } else { this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser()); } }
最终集合 this.annotationParsers 被初始化为:SpringTransactionAnnotationParser 的集合;从中,我们来到了此行的终点:
@Override @Nullable public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) { // 注解工具类,提取注解 ?? AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes( element, Transactional.class, false, false); if (attributes != null) { return parseTransactionAnnotation(attributes); } else { return null; } } // 获取注解标记 protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue());// 超时 ?? rbta.setReadOnly(attributes.getBoolean("readOnly"));// 只读 ?? rbta.setQualifier(attributes.getString("value"));// // 回滚 ?? List<RollbackRuleAttribute> rollbackRules = new ArrayList<>(); for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("rollbackForClassName")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("noRollbackForClassName")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } rbta.setRollbackRules(rollbackRules); return rbta; }
自此事务标签的解析完成,剩下的是对增强的具体实现
应该还记得有两个类被注册到BeanFactoryTransactionAttributeSourceAdvisor 中,我们已经讲到了,AnnotationTransactionAttributeSource,
它的作用是处理增强是否匹配当前方法。 (换句话说,方法是是否能提取到事务标签即可证明是否适用增强)
剩下一个还未提到的类是:TransactionInterceptor,
不多说,关门放类图:
TransactionInterceptor.java 类图
从继承的体系结构,可以很容易得知他是一个拦截器,那么直接看invoke的定义:
public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);// 继续 } protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 获取事务属性 <增强信息> final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // transactionManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 切点信息 ?? - 构造方法唯一标识: 全限定类名.方法名() final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 声明式事务处理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 创建事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 执行被增强的方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 出现异常,处理回滚 默认仅仅回滚 RunTimeException completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // clear cleanupTransactionInfo(txInfo); } // 提交事务 <不论是否回滚都需要提交事务到恰当的状态> // 对于内嵌事务不会直接提交,会将事务结果设置保存点,当最外层事务也正常执行后,由最外层事务统一提交 commitTransactionAfterReturning(txInfo); return retVal; } }
// 创建事务 protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { // 包装器?? txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 事务状态 ?? status = tm.getTransaction(txAttr);// DataSourceTransactionManager.getTransaction(); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 准备并封装事务信息 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
处理事务准备工作,事务获取以及信息构建:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction();// 获取事务 ,基于JDBC创建事务实例,如果当前线程已经记录了连接,可以复用 // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) {// 当前线程已经存在事务,且连接不为空,连接中的 transactionActive 不为空 // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled);// 当前线程存在事务,转向嵌套事务的处理 } // Check definition settings for new transaction. // 事务超时设置验证 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. // 当前线程不存在事务,且 propagationBehavior 被声明为:PROPAGATION_MANDATORY 则抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {// 强制性的,但是当前线程没有事务,所以抛出异常 throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||// 需要?? definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||// 需要一个新的?? definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// 嵌套 // 上述类型,他们都需要新建事务 // 空挂起 ?? SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 构造事务,设置connectionHolder、隔离级别、超时、如果是新连接则需要绑定到当前线程 (委托给低层的连接完成) doBegin(transaction, definition);// 数据库连接设置,新(新生成,或者记录被清空无记录) 连接记录到当前线程 // 新同步事务的设置?? 针对当前线程进行设置 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
数据库连接设置:
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 当前线程没有绑定连接或者设置了同步事务 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true);// 新连接 } txObject.getConnectionHolder().setSynchronizedWithTransaction(true);// 修改connectionHolder 的事务标记 con = txObject.getConnectionHolder().getConnection(); // 设置隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) {// 更改自动提交设置,由spring控制提交 txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true);// 设置当前线程存在事务 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {// 设置连接绑定到事务的过期时间 ?? txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) {// 连接是否为新的连接 ?? 如果是绑定到当前线程 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
处理已经存在的事务:
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {// 处理已经存在的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {// never 抛异常 throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {// supported if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);// always return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {// requires_new if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction);// 挂起旧事务 try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);// 创建新事务 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// nested 嵌套式事务处理 if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. // 没有保存点,在嵌套式事务建立初始保存点 存档?? DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. // 不能使用保存点的时候,新建事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) {// 判断存在事务 if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } // 默认实现: (ex instanceof RuntimeException || ex instanceof Error) if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. // 当不满足 rollbackOn 的回滚条件时,抛异常也会提交事务 try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } }
回滚处理:
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { // 自定义触发器的调用 // 激活所有 TransactionSynchronization 中的 beforeCompletion方法 triggerBeforeCompletion(status); if (status.hasSavepoint()) {// 回滚到保存点 ?? 常用于嵌套事务,内层事务异常回滚不会影响到外层事务 if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } // 保存点依赖于底层的数据库连接 status.rollbackToHeldSavepoint();// 如果有保存点,当前事务为单独的线程(单独的事务),会退到保存点 } else if (status.isNewTransaction()) {// 当前事务为独立的新事务,直接回滚 if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } doRollback(status); } else {// 没有保存点,且不是独立的新事务,那么标记状态,等到事务链执行完后统一回滚 (都不提交) // Participating in larger transaction if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } // 自定义触发器的调用 // 激活所有 TransactionSynchronization 中的 afterCompletion方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status);// 清空记录的资源,例如记录到线程的连接 ; 将挂起的资源恢复,例如:嵌套事务。 } }
public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) {// 事务已经完成,在此提交抛出异常 throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) {// 事务链 中标记了回滚,那么直接回滚,不再尝试提交事务 if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {// if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 处理提交逻辑 processCommit(defStatus); }
处理提交逻辑:
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status);// 预留钩子 triggerBeforeCommit(status);// 提交前触发器激活 triggerBeforeCompletion(status);// 完成前触发器 ?? beforeCompletionInvoked = true; if (status.hasSavepoint()) {// 存在保存点?? if (status.isDebug()) {// 事务正常执行完毕,释放(清除)保存点 logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) {// 独立的新事务,直接提交 if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status);// 提交事务 调用底层的数据库连接 }// 不被spring管理的事务 ?? 无法设置保存点的事务 ?? 设置回滚标识 else if (isFailEarlyOnGlobalRollbackOnly()) {// 事务链中的某个事务被设置了回滚标记,那么直接回滚 unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) {// 回滚标识 throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex);// 提交过程中出现异常,回滚 throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status);// 提交事务后的触发器 } finally { // 事务完成后的触发器 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status);// 事务完成后进行清理 } }