Java教程

AT模式遇到的问题与具体源码篇

本文主要是介绍AT模式遇到的问题与具体源码篇,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

关于AT模式的流程,我们在上一篇文章 一文简要概述Seata AT与TCC的区别
已经介绍的差不多了。但是日常学习中,我们仅仅只知道一个流程是远远不够的。我们往往会遇到许多问题,现在根据几个我在学习Seata时产生的几个问题,进入源码中了解下具体是怎么实现的。

开始之前,介绍几个AT模式下比较重要的类

TM 操作核心代码 io.seata.tm.api.TransactionalTemplate
RM 操作核心代码 io.seata.rm.datasource.exec.ExecuteTemplate

2.1 TM是如何开启全局事务的

// io.seata.tm.api.TransactionalTemplate#beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
  try {
    // 钩子方法
    triggerBeforeBegin();
    // 开去全局事务
    tx.begin(txInfo.getTimeOut(), txInfo.getName());
    // 钩子方法
    triggerAfterBegin();
  } catch (TransactionException txe) {
    throw new TransactionalExecutor.ExecutionException(tx, txe,
                                                       TransactionalExecutor.Code.BeginFailure);

  }
}
// io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
@Override
public void begin(int timeout, String name) throws TransactionException {
  if (role != GlobalTransactionRole.Launcher) {
    assertXIDNotNull();
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
    }
    return;
  }
  assertXIDNull();
  String currentXid = RootContext.getXID();
  if (currentXid != null) {
    throw new IllegalStateException("Global transaction already exists," +
                                    " can't begin a new global transaction, currentXid = " + currentXid);
  }
  // 通过RPC请求调用到TC服务
  xid = transactionManager.begin(null, null, name, timeout);
  status = GlobalStatus.Begin;
  RootContext.bind(xid);
  if (LOGGER.isInfoEnabled()) {
    LOGGER.info("Begin new global transaction [{}]", xid);
  }
}
// io.seata.server.coordinator.DefaultCore#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
  throws TransactionException {
  // 创建全局事务
  GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
                                                            timeout);
  session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

  session.begin();

  // transaction start event
  eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
                                           session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
	// 返回xid给TM
  return session.getXid();
}

2.2 TM是如何提交/回滚全局事务的

// io.seata.tm.api.TransactionalTemplate#execute
try {
  // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
  //    else do nothing. Of course, the hooks will still be triggered.
  // 开启全局事务
  beginTransaction(txInfo, tx);

  Object rs;
  try {
    // Do Your Business
    // 执行业务代码,调用对应的TM服务
    rs = business.execute();
  } catch (Throwable ex) {
    // 3. The needed business exception to rollback.
    // 回滚全局事务 具体如何提交/回滚,留到2.4中描述,此处仅看TM是如何触发提交/回滚操作的
    completeTransactionAfterThrowing(txInfo, tx, ex);
    throw ex;
  }

  // 4. everything is fine, commit.
  // 提交全局事务
  commitTransaction(tx);

  return rs;
} finally {
  //5. clear
  resumeGlobalLockConfig(previousConfig);
  triggerAfterCompletion();
  cleanUp();
}

2.3 RM的回滚日志是怎么生成的

// io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse
protected T executeAutoCommitFalse(Object[] args) throws Exception {
  if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)
  {
    throw new NotSupportYetException("multi pk only support mysql!");
  }
  // 生成执行前的镜象
  TableRecords beforeImage = beforeImage();
  // 获取执行的结果
  T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
  // 生成执行后的镜象
  TableRecords afterImage = afterImage(beforeImage);
  // 生成UndoLog
  prepareUndoLog(beforeImage, afterImage);
  return result;
}

2.4 RM提交之前需要获取全局锁,全局锁是如何获取的(用于防止多个分布式事务修改到相同的数据导致问题)

// io.seata.rm.datasource.ConnectionProxy#doCommit
private void doCommit() throws SQLException {
  if (context.inGlobalTransaction()) {
    // 判断是否含有xid
    processGlobalTransactionCommit();
  } else if (context.isGlobalLockRequire()) {
    // 判断是否需要GlobalLock
    processLocalCommitWithGlobalLocks();
  } else {
    // 直接提交
    targetConnection.commit();
  }
}
// io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
  try {
    // 注册分支事务 此处调用TC进行获取锁
    register();
  } catch (TransactionException e) {
    recognizeLockKeyConflictException(e, context.buildLockKeys());
  }
  try {
    UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
    targetConnection.commit();
  } catch (Throwable ex) {
    LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    report(false);
    throw new SQLException(ex);
  }
  if (IS_REPORT_SUCCESS_ENABLE) {
    // 上报一阶段结果
    report(true);
  }
  context.reset();
}
// io.seata.server.coordinator.AbstractCore#branchRegister
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
  GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
  return SessionHolder.lockAndExecute(globalSession, () -> {
    globalSessionStatusCheck(globalSession);
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                                                                  applicationData, lockKeys, clientId);
    // 上锁
    branchSessionLock(globalSession, branchSession);
    try {
      // 注册分支
      globalSession.addBranch(branchSession);
    } catch (RuntimeException ex) {
      branchSessionUnlock(branchSession);
      throw new BranchTransactionException(FailedToAddBranch, String
                                           .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                                                   branchSession.getBranchId()), ex);
    }
    if (LOGGER.isInfoEnabled()) {
      LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                  globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
    }
    return branchSession.getBranchId();
  });
}
// io.seata.server.lock.AbstractLockManager#acquireLock(具体上锁的代码)
@Override
public boolean acquireLock(BranchSession branchSession) throws TransactionException {
  if (branchSession == null) {
    throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
  }
  String lockKey = branchSession.getLockKey();
  if (StringUtils.isNullOrEmpty(lockKey)) {
    // no lock
    return true;
  }
  // get locks of branch
  List<RowLock> locks = collectRowLocks(branchSession);
  if (CollectionUtils.isEmpty(locks)) {
    // no lock
    return true;
  }
  return getLocker(branchSession).acquireLock(locks);
}

2.5 TC是如何协调各个分支事务的

// 以commit为例,TM调用COMMIT时最终会调用到TC的commit方法
// io.seata.tm.api.DefaultGlobalTransaction#commit
@Override
public void commit() throws TransactionException {
  if (role == GlobalTransactionRole.Participant) {
    // Participant has no responsibility of committing
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
    }
    return;
  }
  assertXIDNotNull();
  int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
  try {
    while (retry > 0) {
      // 包含了重试
      try {
        status = transactionManager.commit(xid);
        break;
      } catch (Throwable ex) {
        LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
        retry--;
        if (retry == 0) {
          // 异常最终会向上抛出至TM处,触发全局回滚
          throw new TransactionException("Failed to report global commit", ex);
        }
      }
    }
  } finally {
    if (xid.equals(RootContext.getXID())) {
      suspend(true);
    }
  }
  if (LOGGER.isInfoEnabled()) {
    LOGGER.info("[{}] commit status: {}", xid, status);
  }
}
// io.seata.server.coordinator.DefaultCore#commit
@Override
public GlobalStatus commit(String xid) throws TransactionException {
  GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
  if (globalSession == null) {
    return GlobalStatus.Finished;
  }
  globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  // just lock changeStatus

  // 是否可以提交全局事务
  boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
    // Highlight: Firstly, close the session, then no more branch can be registered.
    globalSession.closeAndClean();
    if (globalSession.getStatus() == GlobalStatus.Begin) {
      if (globalSession.canBeCommittedAsync()) {
        globalSession.asyncCommit();
        return false;
      } else {
        globalSession.changeStatus(GlobalStatus.Committing);
        return true;
      }
    }
    return false;
  });

  if (shouldCommit) {
    boolean success = doGlobalCommit(globalSession, false);
    if (success && !globalSession.getBranchSessions().isEmpty()) {
      globalSession.asyncCommit();
      return GlobalStatus.Committed;
    } else {
      return globalSession.getStatus();
    }
  } else {
    return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
  }
}
这篇关于AT模式遇到的问题与具体源码篇的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!