2018年写过一篇分库分表的文章《SpringBoot使用sharding-jdbc分库分表》,但是存在很多不完美的地方比如:
针对上述问题,本人计划开发一个通用的分库分表starter,具备以下特性:
通过查看官方文档,可以发现starter的核心逻辑就是获取分库分表等配置,然后在自动配置类创建数据源注入Spring容器即可。
首先创建一个spring-boot-starter工程ship-sharding-spring-boot-starter,不会的小伙伴可以参考以前写的教程《【SpringBoot】编写一个自己的Starter》。
创建自动配置类cn.sp.sharding.config.ShardingAutoConfig,并在resources/META-INF/spring.factories文件中配置自动配置类的全路径。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.sp.sharding.config.ShardingAutoConfig
然后需要在pom.xml文件引入sharding-jbc依赖和工具包guava。
<properties> <java.version>8</java.version> <spring-boot.version>2.4.0</spring-boot.version> <sharding-jdbc.version>4.1.1</sharding-jdbc.version> </properties> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-core</artifactId> <version>${sharding-jdbc.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
分库分表配置这块,为了方便自定义配置前缀,创建ShardingRuleConfigurationProperties类继承sharding-jbc的YamlShardingRuleConfiguration类即可,代码如下:
/** * @author Ship * @version 1.0.0 * @description: * @date 2023/06/06 */ @ConfigurationProperties(prefix = CommonConstants.COMMON_CONFIG_PREFIX + ".config") public class ShardingRuleConfigurationProperties extends YamlShardingRuleConfiguration { }
同时sharding-jbc支持自定义一些properties属性,需要单独创建类ConfigMapConfigurationProperties
/** * @Author: Ship * @Description: * @Date: Created in 2023/6/6 */ @ConfigurationProperties(prefix = CommonConstants.COMMON_CONFIG_PREFIX + ".map") public class ConfigMapConfigurationProperties { private Properties props = new Properties(); public Properties getProps() { return props; } public void setProps(Properties props) { this.props = props; } }
官方提供了ShardingDataSourceFactory工厂类来创建数据源,但是查看其源码发现createDataSource方法的参数是ShardingRuleConfiguration类,而不是YamlShardingRuleConfiguration。
@NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ShardingDataSourceFactory { /** * Create sharding data source. * * @param dataSourceMap data source map * @param shardingRuleConfig rule configuration for databases and tables sharding * @param props properties for data source * @return sharding data source * @throws SQLException SQL exception */ public static DataSource createDataSource( final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException { return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props); } }
该如何解决配置类参数转换的问题呢?
幸好查找官方文档发现sharding-jdbc提供了YamlSwapper类来实现yaml配置和核心配置的转换
/** * YAML configuration swapper. * * @param <Y> type of YAML configuration * @param <T> type of swapped object */ public interface YamlSwapper<Y extends YamlConfiguration, T> { /** * Swap to YAML configuration. * * @param data data to be swapped * @return YAML configuration */ Y swap(T data); /** * Swap from YAML configuration to object. * * @param yamlConfiguration YAML configuration * @return swapped object */ T swap(Y yamlConfiguration); }
ShardingRuleConfigurationYamlSwapper就是YamlSwapper的其中一个实现类。
于是,ShardingAutoConfig的最终代码如下:
package cn.sp.sharding.config; /** * @author Ship * @version 1.0.0 * @description: * @date 2023/06/06 */ @AutoConfigureBefore(name = CommonConstants.MYBATIS_PLUS_CONFIG_CLASS) @Configuration @EnableConfigurationProperties(value = {ShardingRuleConfigurationProperties.class, ConfigMapConfigurationProperties.class}) @Import(DataSourceHealthConfig.class) public class ShardingAutoConfig implements EnvironmentAware { private Map<String, DataSource> dataSourceMap = new HashMap<>(); @ConditionalOnMissingBean @Bean public DataSource shardingDataSource(@Autowired ShardingRuleConfigurationProperties configurationProperties, @Autowired ConfigMapConfigurationProperties configMapConfigurationProperties) throws SQLException { ShardingRuleConfigurationYamlSwapper yamlSwapper = new ShardingRuleConfigurationYamlSwapper(); ShardingRuleConfiguration shardingRuleConfiguration = yamlSwapper.swap(configurationProperties); return ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfiguration, configMapConfigurationProperties.getProps()); } @Override public void setEnvironment(Environment environment) { setDataSourceMap(environment); } private void setDataSourceMap(Environment environment) { String names = environment.getProperty(CommonConstants.DATA_SOURCE_CONFIG_PREFIX + ".names"); for (String name : names.split(",")) { try { String propertiesPrefix = CommonConstants.DATA_SOURCE_CONFIG_PREFIX + "." + name; Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, propertiesPrefix, Map.class); // 反射创建数据源 DataSource dataSource = DataSourceUtil.getDataSource(dataSourceProps.get("type").toString(), dataSourceProps); dataSourceMap.put(name, dataSource); } catch (ReflectiveOperationException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } }
利用反射创建数据源,就可以解决支持多种数据源的问题。
sharding-jdbc提供了UUID和Snowflake两种默认实现,但是自定义主键生成策略更加灵活,方便根据自己的需求调整,接下来介绍如何自定义主键生成策略。
因为我们也是用的雪花算法,所以可以直接用sharding-jdbc提供的雪花算法类,KeyGeneratorFactory负责生成雪花算法实现类的实例,采用双重校验加锁的单例模式。
public final class KeyGeneratorFactory { /** * 使用shardingsphere提供的雪花算法实现 */ private static volatile SnowflakeShardingKeyGenerator keyGenerator = null; private KeyGeneratorFactory() { } /** * 单例模式 * * @return */ public static SnowflakeShardingKeyGenerator getInstance() { if (keyGenerator == null) { synchronized (KeyGeneratorFactory.class) { if (keyGenerator == null) { // 用ip地址当作机器id,机器范围0-1024 Long workerId = Long.valueOf(IpUtil.getLocalIpAddress().replace(".", "")) % 1024; keyGenerator = new SnowflakeShardingKeyGenerator(); Properties properties = new Properties(); properties.setProperty("worker.id", workerId.toString()); keyGenerator.setProperties(properties); } } } return keyGenerator; } }
雪花算法是由1bit 不用 + 41bit时间戳+10bit工作机器id+12bit序列号组成的,所以为了防止不同节点生成的id重复需要设置机器id,机器id的范围是0-1024,这里是用IP地址转数字取模1024来计算机器id,存在很小概率的重复,也可以用redis来生成机器id(参考雪花算法ID重复问题的解决方案 )。
注意: 雪花算法坑其实挺多的,除了系统时间回溯会导致id重复,单节点并发过高也会导致重复(序列位只有12位代表1ms内最多支持4096个并发)。
查看源码可知自定义主键生成器是通过SPI实现的,实现ShardingKeyGenerator接口即可。
package org.apache.shardingsphere.spi.keygen; import org.apache.shardingsphere.spi.TypeBasedSPI; /** * Key generator. */ public interface ShardingKeyGenerator extends TypeBasedSPI { /** * Generate key. * * @return generated key */ Comparable<?> generateKey(); }
/** * @Author: Ship * @Description: 分布式id生成器,雪花算法实现 * @Date: Created in 2023/6/8 */ public class DistributedKeyGenerator implements ShardingKeyGenerator { @Override public Comparable<?> generateKey() { return KeyGeneratorFactory.getInstance().generateKey(); } @Override public String getType() { return "DISTRIBUTED"; } @Override public Properties getProperties() { return null; } @Override public void setProperties(Properties properties) { } }
cn.sp.sharding.key.DistributedKeyGenerator
Spring Boot会在项目启动时执行一条sql语句检查数据源是否可用,因为ShardingDataSource只是对真实数据源进行了封装,没有完全实现Datasouce接口规范,所以会在启动时报错DataSource health check failed,为此需要重写数据源健康检查的逻辑。
创建DataSourceHealthConfig类继承DataSourceHealthContributorAutoConfiguration,然后重写createIndicator方法来重新设置校验sql语句。
/** * @Author: Ship * @Description: * @Date: Created in 2023/6/7 */ public class DataSourceHealthConfig extends DataSourceHealthContributorAutoConfiguration { private static String validQuery = "SELECT 1"; public DataSourceHealthConfig(Map<String, DataSource> dataSources, ObjectProvider<DataSourcePoolMetadataProvider> metadataProviders) { super(dataSources, metadataProviders); } @Override protected AbstractHealthIndicator createIndicator(DataSource source) { DataSourceHealthIndicator healthIndicator = (DataSourceHealthIndicator) super.createIndicator(source); if (StringUtils.hasText(validQuery)) { healthIndicator.setQuery(validQuery); } return healthIndicator; } }
最后使用@Import注解来注入
@AutoConfigureBefore(name = CommonConstants.MYBATIS_PLUS_CONFIG_CLASS) @Configuration @EnableConfigurationProperties(value = {ShardingRuleConfigurationProperties.class, ConfigMapConfigurationProperties.class}) @Import(DataSourceHealthConfig.class) public class ShardingAutoConfig implements EnvironmentAware {
假设有个订单表数据量很大了需要分表,为了方便水平扩展,根据订单的创建时间分表,分表规则如下:
t_order_${创建时间所在年}_${创建时间所在季度}
订单表结构如下
CREATE TABLE `t_order_2022_3` ( `id` bigint(20) unsigned NOT NULL COMMENT '主键', `order_code` varchar(32) DEFAULT NULL COMMENT '订单号', `create_time` bigint(20) NOT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis.version}</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.0.1</version> <exclusions> <exclusion> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>cn.sp</groupId> <artifactId>ship-sharding-spring-boot-starter</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency>
接口 | 描述 |
---|---|
PreciseShardingAlgorithm | 定义等值查询条件下的分表算法 |
RangeShardingAlgorithm | 定义范围查询条件下的分表算法 |
创建算法类MyTableShardingAlgorithm
/** * @Author: Ship * @Description: * @Date: Created in 2023/6/8 */ @Slf4j public class MyTableShardingAlgorithm implements PreciseShardingAlgorithm<Long>, RangeShardingAlgorithm<Long> { private static final String TABLE_NAME_PREFIX = "t_order_"; @Override public String doSharding(Collection<String> availableTableNames, PreciseShardingValue<Long> preciseShardingValue) { Long createTime = preciseShardingValue.getValue(); if (createTime == null) { throw new ShipShardingException("创建时间不能为空!"); } LocalDate localDate = DateUtils.longToLocalDate(createTime); final String year = localDate.getYear() + ""; Integer quarter = DateUtils.getQuarter(localDate); for (String tableName : availableTableNames) { String dateStr = tableName.replace(TABLE_NAME_PREFIX, ""); String[] dateArr = dateStr.split("_"); if (dateArr[0].equals(year) && dateArr[1].equals(quarter.toString())) { return tableName; } } log.error("分表算法对应的表不存在!"); throw new ShipShardingException("分表算法对应的表不存在!"); } @Override public Collection<String> doSharding(Collection<String> availableTableNames, RangeShardingValue<Long> rangeShardingValue) { //获取查询条件中范围值 Range<Long> valueRange = rangeShardingValue.getValueRange(); // 上限值 Long upperEndpoint = valueRange.upperEndpoint(); // 下限值 Long lowerEndpoint = valueRange.lowerEndpoint(); List<String> tableNames = Lists.newArrayList(); for (String tableName : availableTableNames) { String dateStr = tableName.replace(MyTableShardingAlgorithm.TABLE_NAME_PREFIX, ""); String[] dateArr = dateStr.split("_"); String year = dateArr[0]; String quarter = dateArr[1]; Long[] minAndMaxTime = DateUtils.getMinAndMaxTime(year, quarter); Long minTime = minAndMaxTime[0]; Long maxTime = minAndMaxTime[1]; if (valueRange.hasLowerBound() && valueRange.hasUpperBound()) { // between and if (minTime.compareTo(lowerEndpoint) <= 0 && upperEndpoint.compareTo(maxTime) <= 0) { tableNames.add(tableName); } } else if (valueRange.hasLowerBound() && !valueRange.hasUpperBound()) { if (maxTime.compareTo(lowerEndpoint) > 0) { tableNames.add(tableName); } } else { if (upperEndpoint.compareTo(minTime) > 0) { tableNames.add(tableName); } } } if (tableNames.size() == 0) { log.error("分表算法对应的表不存在!"); throw new ShipShardingException("分表算法对应的表不存在!"); } return tableNames; } }
spring: application: name: ship-sharding-example mybatis-plus: base-package: cn.sp.sharding.dao mapper-locations: classpath*:/mapper/*Mapper.xml configuration: #开启自动驼峰命名规则(camel case)映射 map-underscore-to-camel-case: true #延迟加载,需要和lazy-loading-enabled一起使用 aggressive-lazy-loading: true lazy-loading-enabled: true #关闭一级缓存 local-cache-scope: statement #关闭二级级缓存 cache-enabled: false ship: sharding: jdbc: datasource: names: ds0 ds0: driver-class-name: com.mysql.cj.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource url: jdbc:mysql://127.0.0.1:3306/my_springboot?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false username: root password: 1234 initial-size: 5 minIdle: 5 maxActive: 20 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 useGlobalDataSourceStat: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=2000;druid.mysql.usePingMethod=false config: binding-tables: t_order tables: t_order: actual-data-nodes: ds0.t_order_${2022..2023}_${1..4} # 配置主键生成策略 key-generator: type: DISTRIBUTED column: id table-strategy: standard: sharding-column: create_time # 配置分表算法 precise-algorithm-class-name: cn.sp.sharding.algorithm.MyTableShardingAlgorithm range-algorithm-class-name: cn.sp.sharding.algorithm.MyTableShardingAlgorithm
@Test public void testInsert() { Order order = new Order(); order.setOrderCode("OC001"); order.setCreateTime(System.currentTimeMillis()); orderMapper.insert(order); }
运行testInsert()方法,打开t_order_2023_2表发现已经有了一条订单数据
并且该数据的create_time是1686383781371,转换为时间为2023-06-10 15:56:21,刚好对应2023年第二季度,说明数据正确的路由到了对应的表里。
然后测试下数据查询情况
@Test public void testQuery(){ QueryWrapper<Order> wrapper = new QueryWrapper<>(); wrapper.lambda().eq(Order::getOrderCode,"OC001"); List<Order> orders = orderMapper.selectList(wrapper); System.out.println(JSONUtil.toJsonStr(orders)); }
运行testQuery()方法后可以在控制台看到输出了订单报文,说明查询也没问题。
[{"id":1667440550397132802,"orderCode":"OC001","createTime":1686383781371}]
本文代码已经上传到github,后续会把ship-sharding-spring-boot-starter上传到maven中央仓库方便使用,如果觉得对你有用的话希望可以点个赞让更多人看到😏