上一文解析了 ShardingSphere 强一致性事务支持 XAShardingTransactionManager ,本文继续:
sharding-transaction-xa-core中关于 XAShardingTransactionManager,本文研究 XATransactionManager 和 ShardingConnection 类实现。
public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) { for (ResourceDataSource each : resourceDataSources) { // 根据传入的 ResourceDataSource创建XATransactionDataSource并缓存 cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager)); } // 对通过 SPI 创建的 XATransactionManager 也执行其 init 初始化 xaTransactionManager.init(); }
实现也简单:
@Override public TransactionType getTransactionType() { return TransactionType.XA; } @SneakyThrows @Override public boolean isInTransaction() { return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus(); } @Override public Connection getConnection(final String dataSourceName) throws SQLException { return cachedDataSources.get(dataSourceName).getConnection(); }
begin、commit 和 rollback直接委托保存在 XATransactionManager#TransactionManager 完成:
@SneakyThrows @Override public void begin() { xaTransactionManager.getTransactionManager().begin(); } @SneakyThrows @Override public void commit() { xaTransactionManager.getTransactionManager().commit(); } @SneakyThrows @Override public void rollback() { xaTransactionManager.getTransactionManager().rollback(); }
TransactionManager默认实现。
代表资源:
public final class AtomikosXARecoverableResource extends JdbcTransactionalResource { private final String resourceName; AtomikosXARecoverableResource(final String serverName, final XADataSource xaDataSource) { super(serverName, xaDataSource); resourceName = serverName; } // 比对SingleXAResource#ResourceName,确定是否在使用资源,此即设计包装 XAResource 的 SingleXAResource 类的原因 @Override public boolean usesXAResource(final XAResource xaResource) { return resourceName.equals(((SingleXAResource) xaResource).getResourceName()); } }
public final class AtomikosTransactionManager implements XATransactionManager { private final UserTransactionManager transactionManager = new UserTransactionManager(); private final UserTransactionService userTransactionService = new UserTransactionServiceImp(); @Override public void init() { userTransactionService.init(); } @Override public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { userTransactionService.registerResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource)); } @Override public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { userTransactionService.removeResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource)); } @Override @SneakyThrows public void enlistResource(final SingleXAResource xaResource) { transactionManager.getTransaction().enlistResource(xaResource); } @Override public TransactionManager getTransactionManager() { return transactionManager; } @Override public void close() { userTransactionService.shutdown(true); } }
对 Atomikos 的 UserTransactionManager、UserTransactionService 简单调用,Atomikos#UserTransactionManager 实现 TransactionManager 接口,封装所有 TransactionManager 需要完成的工作。
看完 sharding-transaction-xa-atomikos-manager,再看 sharding-transaction-xa-bitronix-manager 工程。基于 bitronix 的 XATransactionManager 实现方案
public final class BitronixXATransactionManager implements XATransactionManager { private final BitronixTransactionManager bitronixTransactionManager = TransactionManagerServices.getTransactionManager(); @Override public void init() { } @SneakyThrows @Override public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { ResourceRegistrar.register(new BitronixRecoveryResource(dataSourceName, xaDataSource)); } @SneakyThrows @Override public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { ResourceRegistrar.unregister(new BitronixRecoveryResource(dataSourceName, xaDataSource)); } @SneakyThrows @Override public void enlistResource(final SingleXAResource singleXAResource) { bitronixTransactionManager.getTransaction().enlistResource(singleXAResource); } @Override public TransactionManager getTransactionManager() { return bitronixTransactionManager; } @Override public void close() { bitronixTransactionManager.shutdown(); } }
XA两阶段提交核心类:
上图的整个流程源头ShardingConnection类,构造函数发现创建 ShardingTransactionManager 过程:
@Getter public final class ShardingConnection extends AbstractConnectionAdapter { public ShardingConnection(...) { ... shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType); } }
ShardingConnection多处用到上面创建的shardingTransactionManager。如:
获取连接:
@Override protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException { return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection(); }
判断是否在同一事务:
private boolean isInShardingTransaction() { return null != shardingTransactionManager && shardingTransactionManager.isInTransaction(); }
@Override public void setAutoCommit(final boolean autoCommit) throws SQLException { if (TransactionType.LOCAL == transactionType) { super.setAutoCommit(autoCommit); return; } if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) { return; } if (autoCommit && shardingTransactionManager.isInTransaction()) { shardingTransactionManager.commit(); return; } if (!autoCommit && !shardingTransactionManager.isInTransaction()) { closeCachedConnections(); shardingTransactionManager.begin(); } }
事务类型为本地事务时,直接调用 ShardingConnection 父类 AbstractConnectionAdapter#setAutoCommit 完成本地事务自动提交:
类似setAutoCommit ,按事务类型决定是否进行分布式提交和回滚:
@Override public void commit() throws SQLException { if (TransactionType.LOCAL == transactionType) { super.commit(); } else { shardingTransactionManager.commit(); } } @Override public void rollback() throws SQLException { if (TransactionType.LOCAL == transactionType) { super.rollback(); } else { shardingTransactionManager.rollback(); } }
ShardingSphere提供两阶段提交的 XA 协议实现方案的同时,也实现柔性事务。看完 XAShardingTransactionManager,来看基于 Seata 框架的柔性事务 TransactionManager 实现类 SeataATShardingTransactionManager。
该类完全采用阿里Seata框架提供分布式事务特性,而非遵循类似 XA 这样的开发规范,所以代码实现比 XAShardingTransactionManager 类层结构简单,复杂性都屏蔽在了框架内部。
集成 Seata,先要初始化 TMClient、RMClient,在 Seata 内部,这两个客户端之间会基于RPC通信。
SeataATShardingTransactionManager#init的initSeataRPCClient初始化这俩客户端对象:
// 根据 seata.conf 创建配置对象 FileConfiguration configuration = new FileConfiguration("seata.conf"); initSeataRPCClient() { String applicationId = configuration.getConfig("client.application.id"); Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file"); String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default"); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup); }
Seata也提供一套构建在 JDBC 规范之上的实现策略,类似03文介绍的 ShardingSphere 与 JDBC 规范之间兼容性。
Seata使用DataSourceProxy、ConnectionProxy代理对象,如DataSourceProxy:
实现了自定义Resource接口,继承AbstractDataSourceProxy(最终实现JDBC的DataSource接口)。所以,初始化 Seata 框架时,也要根据输入 DataSource 对象构建 DataSourceProxy,并通过 DataSourceProxy 获取 ConnectionProxy。
@Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) { // 初始化 Seata 客户端 initSeataRPCClient(); // 创建 DataSourceProxy 并放入Map for (ResourceDataSource each : resourceDataSources) { dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource())); } } @Override public Connection getConnection(final String dataSourceName) { // 根据 DataSourceProxy 获取 ConnectionProxy return dataSourceMap.get(dataSourceName).getConnection(); }
初始化后,提供了事务开启和提交相关的入口。Seata的GlobalTransaction是核心接口,封装了面向用户操作层的分布式事务访问入口:
public interface GlobalTransaction { void begin() throws TransactionException; void begin(int timeout) throws TransactionException; void begin(int timeout, String name) throws TransactionException; void commit() throws TransactionException; void rollback() throws TransactionException; GlobalStatus getStatus() throws TransactionException; String getXid(); }
ShardingSphere 作 GlobalTransaction 的用户层,也基于 GlobalTransaction 完成分布式事务操作。但 ShardingSphere 并未直接使用这层,而是设计位于sharding-transaction-base-seata-at的SeataTransactionHolder类,保存线程安全的 GlobalTransaction 对象。
final class SeataTransactionHolder { private static final ThreadLocal<GlobalTransaction> CONTEXT = new ThreadLocal<>(); static void set(final GlobalTransaction transaction) { CONTEXT.set(transaction); } static GlobalTransaction get() { return CONTEXT.get(); } static void clear() { CONTEXT.remove(); } }
使用 ThreadLocal 确保对 GlobalTransaction 访问的线程安全性。
咋判断当前操作是否处于一个全局事务?Seata存在一个上下文对象RootContex保存参与者和发起者之间传播的 Xid:
因此,只需判断:
@Override public boolean isInTransaction() { return null != RootContext.getXID(); }
Seata 也提供针对全局事务的上下文类 GlobalTransactionContext,可用:
就不难理解如下实现了
@Override @SneakyThrows public void begin() { // 创建一个 GlobalTransaction,保存到 SeataTransactionHolder SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate()); // 从 SeataTransactionHolder 获取一个 GlobalTransaction,并调 begin 启动事务 SeataTransactionHolder.get().begin(); SeataTransactionBroadcaster.collectGlobalTxId(); }
注意到最后的类:
保存 Seata 全局 Xid 的一个容器类。事务启动时收集全局 Xid 并进行保存,而在事务提交或回滚时清空这些 Xid。
class SeataTransactionBroadcaster { String SEATA_TX_XID = "SEATA_TX_XID"; static void collectGlobalTxId() { if (RootContext.inGlobalTransaction()) { ShardingExecuteDataMap.getDataMap().put(SEATA_TX_XID, RootContext.getXID()); } } static void broadcastIfNecessary(final Map<String, Object> shardingExecuteDataMap) { if (shardingExecuteDataMap.containsKey(SEATA_TX_XID) && !RootContext.inGlobalTransaction()) { RootContext.bind((String) shardingExecuteDataMap.get(SEATA_TX_XID)); } } static void clear() { ShardingExecuteDataMap.getDataMap().remove(SEATA_TX_XID); } }
因此
实现就清楚了:
@Override public void commit() { try { SeataTransactionHolder.get().commit(); } finally { SeataTransactionBroadcaster.clear(); SeataTransactionHolder.clear(); } } @Override public void rollback() { try { SeataTransactionHolder.get().rollback(); } finally { SeataTransactionBroadcaster.clear(); SeataTransactionHolder.clear(); } } @Override public void close() { dataSourceMap.clear(); SeataTransactionHolder.clear(); TmRpcClient.getInstance().destroy(); RmRpcClient.getInstance().destroy(); }
sharding-transaction-base-seata-at 工程中的代码实际上就只有这些内容,这些内容也构成了在 ShardingSphere中 集成 Seata 框架的实现过程。
本文给出应用程序咋集成 Seata 分布式事务框架的详细过程,ShardingSphere 提供一种模版实现。日常开发,若想在业务代码集成 Seata,可参考 SeataTransactionHolder、SeataATShardingTransactionManager 等核心代码,而无需太多修改。
XAShardingTransactionManager理解难在从 ShardingConnection 到底层 JDBC 规范的整个集成和兼容过程。
参考 ShardingSphere 的实现:
seata.conf
文件,定义 applicationId
和 transactionServiceGroup
等参数。项目中初始化 TMClient 和 RMClient,它们分别代表事务管理器和资源管理器:
FileConfiguration configuration = new FileConfiguration("seata.conf"); String applicationId = configuration.getConfig("client.application.id"); String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default"); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup);
构建 DataSourceProxy
: 使用 Seata 的 DataSourceProxy
对数据源进行代理。
DataSourceProxy dataSourceProxy = new DataSourceProxy(originalDataSource);
获取连接代理:从代理数据源中获取 ConnectionProxy
,使每个数据库连接支持事务传播。
Connection connection = dataSourceProxy.getConnection();
基于 GlobalTransactionContext
获取或创建事务对象:
GlobalTransaction transaction = GlobalTransactionContext.getCurrentOrCreate();
绑定全局事务 XID: 当事务发起时,将全局事务的 XID 存储在 RootContext
中:
RootContext.bind(transaction.getXid());
通过 RootContext
判断事务状态:
boolean isInTransaction = RootContext.inGlobalTransaction();
开启事务:
transaction.begin();
提交事务:
try { transaction.commit(); } finally { RootContext.unbind(); }
回滚事务:
try { transaction.rollback(); } finally { RootContext.unbind(); }
将分布式事务的核心逻辑封装在工具类中,例如 SeataTransactionHolder
,以便方便地管理全局事务上下文:
SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
在应用关闭时,清理客户端资源:
TmRpcClient.getInstance().destroy(); RmRpcClient.getInstance().destroy();
DataSourceProxy
代理,避免事务管理失效。通过上述步骤,可以在业务代码中顺利集成 Seata,实现分布式事务管理,保障数据一致性。
关注我,紧跟本系列专栏文章,咱们下篇再续!
作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。
各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。
负责:
- 中央/分销预订系统性能优化
- 活动&券等营销中台建设
- 交易平台及数据中台等架构和开发设计
- 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
- LLM Agent应用开发
- 区块链应用开发
- 大数据开发挖掘经验
- 推荐系统项目
目前主攻市级软件项目设计、构建服务全社会的应用系统。
参考: