关于AT模式的流程,我们在上一篇文章 一文简要概述Seata AT与TCC的区别
已经介绍的差不多了。但是日常学习中,我们仅仅只知道一个流程是远远不够的。我们往往会遇到许多问题,现在根据几个我在学习Seata时产生的几个问题,进入源码中了解下具体是怎么实现的。
开始之前,介绍几个AT模式下比较重要的类
TM 操作核心代码 io.seata.tm.api.TransactionalTemplate
RM 操作核心代码 io.seata.rm.datasource.exec.ExecuteTemplate
// io.seata.tm.api.TransactionalTemplate#beginTransaction private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { // 钩子方法 triggerBeforeBegin(); // 开去全局事务 tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 钩子方法 triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } } // io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String) @Override public void begin(int timeout, String name) throws TransactionException { if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); String currentXid = RootContext.getXID(); if (currentXid != null) { throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid); } // 通过RPC请求调用到TC服务 xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); } } // io.seata.server.coordinator.DefaultCore#begin @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 创建全局事务 GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); session.begin(); // transaction start event eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC, session.getTransactionName(), session.getBeginTime(), null, session.getStatus())); // 返回xid给TM return session.getXid(); }
// io.seata.tm.api.TransactionalTemplate#execute try { // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, // else do nothing. Of course, the hooks will still be triggered. // 开启全局事务 beginTransaction(txInfo, tx); Object rs; try { // Do Your Business // 执行业务代码,调用对应的TM服务 rs = business.execute(); } catch (Throwable ex) { // 3. The needed business exception to rollback. // 回滚全局事务 具体如何提交/回滚,留到2.4中描述,此处仅看TM是如何触发提交/回滚操作的 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. everything is fine, commit. // 提交全局事务 commitTransaction(tx); return rs; } finally { //5. clear resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); }
// io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1) { throw new NotSupportYetException("multi pk only support mysql!"); } // 生成执行前的镜象 TableRecords beforeImage = beforeImage(); // 获取执行的结果 T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // 生成执行后的镜象 TableRecords afterImage = afterImage(beforeImage); // 生成UndoLog prepareUndoLog(beforeImage, afterImage); return result; }
// io.seata.rm.datasource.ConnectionProxy#doCommit private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { // 判断是否含有xid processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { // 判断是否需要GlobalLock processLocalCommitWithGlobalLocks(); } else { // 直接提交 targetConnection.commit(); } } // io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit private void processGlobalTransactionCommit() throws SQLException { try { // 注册分支事务 此处调用TC进行获取锁 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { // 上报一阶段结果 report(true); } context.reset(); } // io.seata.server.coordinator.AbstractCore#branchRegister @Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSessionNotNull(xid, false); return SessionHolder.lockAndExecute(globalSession, () -> { globalSessionStatusCheck(globalSession); globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId); // 上锁 branchSessionLock(globalSession, branchSession); try { // 注册分支 globalSession.addBranch(branchSession); } catch (RuntimeException ex) { branchSessionUnlock(branchSession); throw new BranchTransactionException(FailedToAddBranch, String .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys); } return branchSession.getBranchId(); }); } // io.seata.server.lock.AbstractLockManager#acquireLock(具体上锁的代码) @Override public boolean acquireLock(BranchSession branchSession) throws TransactionException { if (branchSession == null) { throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); } String lockKey = branchSession.getLockKey(); if (StringUtils.isNullOrEmpty(lockKey)) { // no lock return true; } // get locks of branch List<RowLock> locks = collectRowLocks(branchSession); if (CollectionUtils.isEmpty(locks)) { // no lock return true; } return getLocker(branchSession).acquireLock(locks); }
// 以commit为例,TM调用COMMIT时最终会调用到TC的commit方法 // io.seata.tm.api.DefaultGlobalTransaction#commit @Override public void commit() throws TransactionException { if (role == GlobalTransactionRole.Participant) { // Participant has no responsibility of committing if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid); } return; } assertXIDNotNull(); int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT; try { while (retry > 0) { // 包含了重试 try { status = transactionManager.commit(xid); break; } catch (Throwable ex) { LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); retry--; if (retry == 0) { // 异常最终会向上抛出至TM处,触发全局回滚 throw new TransactionException("Failed to report global commit", ex); } } } } finally { if (xid.equals(RootContext.getXID())) { suspend(true); } } if (LOGGER.isInfoEnabled()) { LOGGER.info("[{}] commit status: {}", xid, status); } } // io.seata.server.coordinator.DefaultCore#commit @Override public GlobalStatus commit(String xid) throws TransactionException { GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // just lock changeStatus // 是否可以提交全局事务 boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> { // Highlight: Firstly, close the session, then no more branch can be registered. globalSession.closeAndClean(); if (globalSession.getStatus() == GlobalStatus.Begin) { if (globalSession.canBeCommittedAsync()) { globalSession.asyncCommit(); return false; } else { globalSession.changeStatus(GlobalStatus.Committing); return true; } } return false; }); if (shouldCommit) { boolean success = doGlobalCommit(globalSession, false); if (success && !globalSession.getBranchSessions().isEmpty()) { globalSession.asyncCommit(); return GlobalStatus.Committed; } else { return globalSession.getStatus(); } } else { return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus(); } }