聊聊动态数据源

关系型数据库开发与运维微服务

大家好,我是苏三,又跟大家见面了。

前言

咱们星球中的商城系统中使用了动态数据源的功能,实现了分库分表的订单库的读库和写库的自动切换。

有球友反馈说,对动态数据源不太熟悉。

今天这篇文章就专门跟大家一起聊聊动态数据源,希望对你会有所帮助。

一、为什么需要动态数据源?

有些小伙伴在开发中可能会遇到这样的场景:一个系统需要同时访问多个数据库,或者需要根据业务参数动态选择数据源。这

时候,传统的单数据源配置就显得力不从心了。

1.1 传统多数据源的问题

传统方式的多个数据源配置,硬编码,不灵活。

例如下面这样:

  
@Configuration  
public class TraditionalDataSourceConfig {  
      
    @Bean  
    @Primary  
    public DataSource primaryDataSource() {  
        HikariDataSource dataSource = new HikariDataSource();  
        dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db1");  
        dataSource.setUsername("user1");  
        dataSource.setPassword("pass1");  
        return dataSource;  
    }  
      
    @Bean  
    public DataSource secondaryDataSource() {  
        HikariDataSource dataSource = new HikariDataSource();  
        dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/db2");  
        dataSource.setUsername("user2");  
        dataSource.setPassword("pass2");  
        return dataSource;  
    }  
}  

使用时需要手动管理数据源。

  
@Repository  
public class TraditionalUserDao {  
      
    @Autowired  
    @Qualifier("primaryDataSource")  
    private DataSource primaryDataSource;  
      
    @Autowired  
    @Qualifier("secondaryDataSource")  
    private DataSource secondaryDataSource;  
      
    public User findUserFromPrimary(Long id) {  
        // 需要手动获取连接、处理异常、关闭连接  
        try (Connection conn = primaryDataSource.getConnection();  
             PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?")) {  
            stmt.setLong(1, id);  
            ResultSet rs = stmt.executeQuery();  
            // 处理结果集...  
        } catch (SQLException e) {  
            throw new RuntimeException("查询失败", e);  
        }  
    }  
  
}  

每个方法都要重复这样的模板代码,需要手动指定数据源,很麻烦。

那么,如何做优化呢?

1.2 动态数据源的优势

接下来,我们一起看看使用动态数据源后的优雅代码。

  
@Service  
public class UserService {  
      
    @Autowired  
    private UserMapper userMapper;  
      
    // 根据租户ID自动选择数据源  
    public User findUserByTenant(Long userId, String tenantId) {  
        // 设置数据源上下文  
        DataSourceContextHolder.setDataSource(tenantId);  
        try {  
            return userMapper.selectById(userId);  
        } finally {  
            // 清理上下文  
            DataSourceContextHolder.clear();  
        }  
    }  
      
    // 多租户数据聚合查询  
    public UserAggregateInfo getUserAggregateInfo(Long userId) {  
        UserAggregateInfo result = new UserAggregateInfo();  
          
        // 查询主库  
        DataSourceContextHolder.setDataSource("master");  
        result.setBaseInfo(userMapper.selectById(userId));  
          
        // 查询归档库  
        DataSourceContextHolder.setDataSource("archive");  
        result.setHistory(userMapper.selectHistory(userId));  
          
        // 查询统计库  
        DataSourceContextHolder.setDataSource("stats");  
        result.setStatistics(userMapper.selectStats(userId));  
          
        return result;  
    }  
}  

代码中能根据租户ID自动选择数据源。

代码一下子变得更优雅了。

二、动态数据源的原理

有些小伙伴在使用动态数据源时,可能只是简单配置使用,并不清楚其底层工作原理。

理解核心原理对于排查问题和性能优化至关重要。

下面跟大家一起聊聊动态数据源的核心原理,希望对你会有所帮助。

数据源路由的核心机制

动态数据源的核心在于AbstractRoutingDataSource,它是Spring框架提供的抽象类:

  
// Spring AbstractRoutingDataSource 源码分析  
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {  
      
    // 目标数据源映射表  
    private Map<Object, Object> targetDataSources;  
      
    // 默认数据源  
    private Object defaultTargetDataSource;  
      
    // 解析后的数据源映射  
    private Map<Object, DataSource> resolvedDataSources;  
      
    // 解析后的默认数据源  
    private DataSource resolvedDefaultDataSource;  
      
    // 关键方法:确定当前查找键  
    protected abstract Object determineCurrentLookupKey();  
      
    // 获取连接时选择数据源  
    @Override  
    public Connection getConnection() throws SQLException {  
        return determineTargetDataSource().getConnection();  
    }  
      
    @Override  
    public Connection getConnection(String username, String password) throws SQLException {  
        return determineTargetDataSource().getConnection(username, password);  
    }  
      
    // 确定目标数据源  
    protected DataSource determineTargetDataSource() {  
        // 获取查找键  
        Object lookupKey = determineCurrentLookupKey();  
          
        // 根据查找键获取数据源  
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);  
        if (dataSource == null && (this.resolvedDefaultDataSource != null || lookupKey == null)) {  
            dataSource = this.resolvedDefaultDataSource;  
        }  
        if (dataSource == null) {  
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");  
        }  
        return dataSource;  
    }  
}  

线程安全的数据源上下文管理

  
/**  
 * 数据源上下文管理器 - 核心组件  
 * 使用ThreadLocal保证线程安全  
 */  
public class DataSourceContextHolder {  
      
    // 使用ThreadLocal保证线程隔离  
    private static final ThreadLocal<String> CONTEXT\_HOLDER = new ThreadLocal<>();  
      
    // 支持嵌套数据源切换的栈  
    private static final ThreadLocal<Deque<String>> DATASOURCE\_STACK = ThreadLocal.withInitial(ArrayDeque::new);  
      
    // 设置数据源  
    public static void setDataSource(String dataSource) {  
        if (dataSource == null) {  
            throw new IllegalArgumentException("数据源不能为null");  
        }  
        CONTEXT\_HOLDER.set(dataSource);  
          
        // 同时压入栈,支持嵌套调用  
        DATASOURCE\_STACK.get().push(dataSource);  
    }  
      
    // 获取当前数据源  
    public static String getDataSource() {  
        return CONTEXT\_HOLDER.get();  
    }  
      
    // 清除数据源  
    public static void clear() {  
        CONTEXT\_HOLDER.remove();  
        Deque<String> stack = DATASOURCE\_STACK.get();  
        if (!stack.isEmpty()) {  
            stack.pop();  
            // 如果栈中还有元素,恢复到上一个数据源  
            if (!stack.isEmpty()) {  
                CONTEXT\_HOLDER.set(stack.peek());  
            }  
        }  
    }  
      
    // 强制清除所有上下文(用于线程池场景)  
    public static void clearCompletely() {  
        CONTEXT\_HOLDER.remove();  
        DATASOURCE\_STACK.get().clear();  
    }  
      
    // 判断是否已设置数据源  
    public static boolean hasDataSource() {  
        return CONTEXT\_HOLDER.get() != null;  
    }  
}  
  
/**  
 * 自定义路由数据源  
 */  
@Component  
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {  
      
    // 所有可用的数据源  
    private final Map<Object, Object> targetDataSources = new ConcurrentHashMap<>();  
      
    @Override  
    protected Object determineCurrentLookupKey() {  
        String dataSourceKey = DataSourceContextHolder.getDataSource();  
          
        if (dataSourceKey == null) {  
            // 返回默认数据源  
            return "default";  
        }  
          
        // 验证数据源是否存在  
        if (!targetDataSources.containsKey(dataSourceKey)) {  
            throw new IllegalArgumentException("数据源 " + dataSourceKey + " 不存在");  
        }  
          
        logger.debug("当前使用数据源: {}", dataSourceKey);  
        return dataSourceKey;  
    }  
      
    // 添加数据源  
    public void addDataSource(String key, DataSource dataSource) {  
        this.targetDataSources.put(key, dataSource);  
        // 更新目标数据源映射  
        setTargetDataSources(new HashMap<>(this.targetDataSources));  
        // 重新初始化  
        afterPropertiesSet();  
    }  
      
    // 移除数据源  
    public void removeDataSource(String key) {  
        if (this.targetDataSources.containsKey(key)) {  
            DataSource dataSource = (DataSource) this.targetDataSources.remove(key);  
            // 关闭数据源连接池  
            closeDataSource(dataSource);  
            // 更新目标数据源映射  
            setTargetDataSources(new HashMap<>(this.targetDataSources));  
            afterPropertiesSet();  
        }  
    }  
      
    // 获取所有数据源  
    public Map<Object, Object> getTargetDataSources() {  
        return Collections.unmodifiableMap(targetDataSources);  
    }  
      
    private void closeDataSource(DataSource dataSource) {  
        if (dataSource instanceof HikariDataSource) {  
            ((HikariDataSource) dataSource).close();  
        } else if (dataSource instanceof org.apache.tomcat.jdbc.pool.DataSource) {  
            ((org.apache.tomcat.jdbc.pool.DataSource) dataSource).close();  
        }  
        // 其他类型的数据源关闭逻辑...  
    }  
}  

动态数据源执行流程

picture.image

三、基于Spring Boot的完整实现

有些小伙伴在配置动态数据源时可能会遇到各种问题,下面我提供一个生产级别的完整实现。

完整配置实现

  
/**  
 * 动态数据源配置类  
 */  
@Configuration  
@EnableTransactionManagement  
@EnableConfigurationProperties(DynamicDataSourceProperties.class)  
public class DynamicDataSourceConfig {  
      
    @Autowired  
    private DynamicDataSourceProperties properties;  
      
    /**  
     * 主数据源(默认数据源)  
     */  
    @Bean  
    @ConfigurationProperties(prefix = "spring.datasource.master")  
    public DataSource masterDataSource() {  
        return DataSourceBuilder.create().build();  
    }  
      
    /**  
     * 从数据源1  
     */  
    @Bean  
    @ConfigurationProperties(prefix = "spring.datasource.slave1")  
    public DataSource slave1DataSource() {  
        return DataSourceBuilder.create().build();  
    }  
      
    /**  
     * 从数据源2  
     */  
    @Bean  
    @ConfigurationProperties(prefix = "spring.datasource.slave2")  
    public DataSource slave2DataSource() {  
        return DataSourceBuilder.create().build();  
    }  
      
    /**  
     * 动态数据源  
     */  
    @Bean  
    @Primary  
    public DataSource dynamicDataSource(DataSource masterDataSource,   
                                       DataSource slave1DataSource,   
                                       DataSource slave2DataSource) {  
          
        Map<Object, Object> targetDataSources = new HashMap<>(8);  
        targetDataSources.put("master", masterDataSource);  
        targetDataSources.put("slave1", slave1DataSource);  
        targetDataSources.put("slave2", slave2DataSource);  
          
        DynamicRoutingDataSource dynamicDataSource = new DynamicRoutingDataSource();  
        // 设置默认数据源  
        dynamicDataSource.setDefaultTargetDataSource(masterDataSource);  
        // 设置目标数据源  
        dynamicDataSource.setTargetDataSources(targetDataSources);  
          
        return dynamicDataSource;  
    }  
      
    /**  
     * 事务管理器  
     */  
    @Bean  
    public PlatformTransactionManager transactionManager(DataSource dynamicDataSource) {  
        returnnew DataSourceTransactionManager(dynamicDataSource);  
    }  
      
    /**  
     * MyBatis配置  
     */  
    @Bean  
    public SqlSessionFactory sqlSessionFactory(DataSource dynamicDataSource) throws Exception {  
        SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();  
        sessionFactory.setDataSource(dynamicDataSource);  
          
        // 配置MyBatis  
        org.apache.ibatis.session.Configuration configuration =   
            new org.apache.ibatis.session.Configuration();  
        configuration.setMapUnderscoreToCamelCase(true);  
        configuration.setCacheEnabled(true);  
        configuration.setLazyLoadingEnabled(false);  
        configuration.setAggressiveLazyLoading(false);  
        sessionFactory.setConfiguration(configuration);  
          
        return sessionFactory.getObject();  
    }  
}  
  
/**  
 * 数据源配置属性类  
 */  
@ConfigurationProperties(prefix = "spring.datasource")  
@Data  
public class DynamicDataSourceProperties {  
      
    /**  
     * 主数据源配置  
     */  
    private HikariConfig master = new HikariConfig();  
      
    /**  
     * 从数据源1配置  
     */  
    private HikariConfig slave1 = new HikariConfig();  
      
    /**  
     * 从数据源2配置  
     */  
    private HikariConfig slave2 = new HikariConfig();  
      
    /**  
     * 动态数据源配置  
     */  
    private DynamicConfig dynamic = new DynamicConfig();  
      
    @Data  
    public static class DynamicConfig {  
        /**  
         * 默认数据源  
         */  
        private String primary = "master";  
          
        /**  
         * 是否开启严格模式  
         */  
        private boolean strict = false;  
          
        /**  
         * 数据源健康检查间隔(秒)  
         */  
        private long healthCheckInterval = 30;  
    }  
}  

应用配置文件

  
# application.yml  
spring:  
datasource:  
    # 动态数据源配置  
    dynamic:  
      primary:master  
      strict:true  
      health-check-interval:30  
      
    # 主数据源  
    master:  
      jdbc-url:jdbc:mysql://localhost:3306/master\_db?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true  
      username:root  
      password:master\_password  
      driver-class-name:com.mysql.cj.jdbc.Driver  
      maximum-pool-size:20  
      minimum-idle:5  
      connection-timeout:30000  
      idle-timeout:600000  
      max-lifetime:1800000  
      pool-name:MasterHikariPool  
      
    # 从数据源1  
    slave1:  
      jdbc-url:jdbc:mysql://slave1:3306/slave\_db?useUnicode=true&characterEncoding=utf8  
      username:root  
      password:slave1\_password  
      driver-class-name:com.mysql.cj.jdbc.Driver  
      maximum-pool-size:15  
      minimum-idle:3  
      connection-timeout:30000  
      idle-timeout:600000  
      max-lifetime:1800000  
      pool-name:Slave1HikariPool  
      
    # 从数据源2  
    slave2:  
      jdbc-url:jdbc:mysql://slave2:3306/slave\_db?useUnicode=true&characterEncoding=utf8  
      username:root  
      password:slave2\_password  
      driver-class-name:com.mysql.cj.jdbc.Driver  
      maximum-pool-size:15  
      minimum-idle:3  
      connection-timeout:30000  
      idle-timeout:600000  
      max-lifetime:1800000  
      pool-name:Slave2HikariPool  
  
# MyBatis配置  
mybatis:  
configuration:  
    map-underscore-to-camel-case:true  
    cache-enabled:true  
    lazy-loading-enabled:false  
    aggressive-lazy-loading:false  

注解式数据源切换

  
/**  
 * 数据源注解  
 */  
@Target({ElementType.METHOD, ElementType.TYPE})  
@Retention(RetentionPolicy.RUNTIME)  
@Documented  
public @interface DataSource {  
      
    /**  
     * 数据源名称  
     */  
    String value() default "master";  
      
    /**  
     * 是否在方法执行后清除数据源(默认清除)  
     */  
    boolean clear() default true;  
}  
  
/**  
 * 数据源切面  
 */  
@Aspect  
@Component  
@Slf4j  
public class DataSourceAspect {  
      
    /**  
     * 定义切点:所有标注@DataSource注解的方法  
     */  
    @Pointcut("@annotation(com.example.annotation.DataSource)")  
    public void dataSourcePointCut() {}  
      
    /**  
     * 环绕通知:在方法执行前后切换数据源  
     */  
    @Around("dataSourcePointCut()")  
    public Object around(ProceedingJoinPoint point) throws Throwable {  
        MethodSignature signature = (MethodSignature) point.getSignature();  
        Method method = signature.getMethod();  
          
        DataSource dataSourceAnnotation = method.getAnnotation(DataSource.class);  
        if (dataSourceAnnotation == null) {  
            // 类级别注解  
            dataSourceAnnotation = point.getTarget().getClass().getAnnotation(DataSource.class);  
        }  
          
        if (dataSourceAnnotation != null) {  
            String dataSourceKey = dataSourceAnnotation.value();  
            boolean clearAfter = dataSourceAnnotation.clear();  
              
            try {  
                log.debug("切换数据源到: {}", dataSourceKey);  
                DataSourceContextHolder.setDataSource(dataSourceKey);  
                  
                // 执行原方法  
                return point.proceed();  
                  
            } finally {  
                if (clearAfter) {  
                    DataSourceContextHolder.clear();  
                    log.debug("清除数据源上下文");  
                }  
            }  
        }  
          
        // 没有注解,使用默认数据源  
        return point.proceed();  
    }  
}  

四、高级特性

有些小伙伴在基础功能实现后,可能会遇到一些高级场景的需求。

下面介绍几个生产环境中常用的高级特性。

读写分离自动路由

  
/**  
 * 读写分离数据源路由器  
 */  
@Component  
@Slf4j  
public class ReadWriteDataSourceRouter {  
      
    // 读数据源列表  
    private final List<String> readDataSources = Arrays.asList("slave1", "slave2");  
      
    // 轮询计数器  
    private final AtomicInteger counter = new AtomicInteger(0);  
      
    /**  
     * 根据SQL自动选择数据源  
     */  
    public String determineDataSource(boolean isReadOperation) {  
        if (isReadOperation && !readDataSources.isEmpty()) {  
            // 读操作:轮询选择从库  
            int index = counter.getAndIncrement() % readDataSources.size();  
            if (counter.get() > 9999) {  
                counter.set(0); // 防止溢出  
            }  
            String readDataSource = readDataSources.get(index);  
            log.debug("读操作选择数据源: {}", readDataSource);  
            return readDataSource;  
        } else {  
            // 写操作:选择主库  
            log.debug("写操作选择数据源: master");  
            return"master";  
        }  
    }  
      
    /**  
     * 根据SQL语句判断是否为读操作  
     */  
    public boolean isReadOperation(String sql) {  
        if (sql == null) {  
            returntrue; // 默认为读操作  
        }  
          
        String trimmedSql = sql.trim().toLowerCase();  
        return trimmedSql.startsWith("select") ||   
               trimmedSql.startsWith("show") ||  
               trimmedSql.startsWith("explain");  
    }  
}  
  
/**  
 * MyBatis拦截器 - 自动读写分离  
 */  
@Intercepts({  
    @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),  
    @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})  
})  
@Component  
@Slf4j  
public class ReadWriteInterceptor implements Interceptor {  
      
    @Autowired  
    private ReadWriteDataSourceRouter dataSourceRouter;  
      
    @Override  
    public Object intercept(Invocation invocation) throws Throwable {  
        String methodName = invocation.getMethod().getName();  
        MappedStatement ms = (MappedStatement) invocation.getArgs()[0];  
          
        boolean isReadOperation = "query".equals(methodName);  
        String sql = getSql(ms, invocation.getArgs()[1]);  
          
        // 如果当前没有手动设置数据源,则自动选择  
        if (!DataSourceContextHolder.hasDataSource()) {  
            String dataSource = dataSourceRouter.determineDataSource(isReadOperation);  
            DataSourceContextHolder.setDataSource(dataSource);  
              
            try {  
                return invocation.proceed();  
            } finally {  
                DataSourceContextHolder.clear();  
            }  
        }  
          
        return invocation.proceed();  
    }  
      
    private String getSql(MappedStatement ms, Object parameter) {  
        BoundSql boundSql = ms.getBoundSql(parameter);  
        return boundSql.getSql();  
    }  
}  

多租户数据源管理

  
/**  
 * 多租户数据源管理器  
 */  
@Component  
@Slf4j  
public class TenantDataSourceManager {  
      
    @Autowired  
    private DynamicRoutingDataSource dynamicRoutingDataSource;  
      
    @Autowired  
    private DataSourceProperties dataSourceProperties;  
      
    // 租户数据源配置缓存  
    private final Map<String, TenantDataSourceConfig> tenantConfigCache = new ConcurrentHashMap<>();  
      
    /**  
     * 根据租户ID获取数据源  
     */  
    public DataSource getDataSourceForTenant(String tenantId) {  
        String dataSourceKey = "tenant\_" + tenantId;  
          
        // 检查是否已存在数据源  
        if (dynamicRoutingDataSource.getTargetDataSources().containsKey(dataSourceKey)) {  
            return (DataSource) dynamicRoutingDataSource.getTargetDataSources().get(dataSourceKey);  
        }  
          
        // 动态创建数据源  
        synchronized (this) {  
            if (!dynamicRoutingDataSource.getTargetDataSources().containsKey(dataSourceKey)) {  
                DataSource dataSource = createTenantDataSource(tenantId);  
                dynamicRoutingDataSource.addDataSource(dataSourceKey, dataSource);  
                log.info("为租户 {} 创建数据源: {}", tenantId, dataSourceKey);  
            }  
        }  
          
        return (DataSource) dynamicRoutingDataSource.getTargetDataSources().get(dataSourceKey);  
    }  
      
    /**  
     * 动态创建租户数据源  
     */  
    private DataSource createTenantDataSource(String tenantId) {  
        TenantDataSourceConfig config = getTenantConfig(tenantId);  
          
        HikariDataSource dataSource = new HikariDataSource();  
        dataSource.setJdbcUrl(buildJdbcUrl(config));  
        dataSource.setUsername(config.getUsername());  
        dataSource.setPassword(config.getPassword());  
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");  
        dataSource.setMaximumPoolSize(10);  
        dataSource.setMinimumIdle(2);  
        dataSource.setConnectionTimeout(30000);  
        dataSource.setIdleTimeout(600000);  
        dataSource.setMaxLifetime(1800000);  
        dataSource.setPoolName("TenantPool\_" + tenantId);  
          
        return dataSource;  
    }  
      
    /**  
     * 获取租户数据源配置(可从配置中心或数据库获取)  
     */  
    private TenantDataSourceConfig getTenantConfig(String tenantId) {  
        return tenantConfigCache.computeIfAbsent(tenantId, key -> {  
            // 这里可以从配置中心、数据库或缓存中获取租户配置  
            // 简化实现,实际项目中需要完善  
            TenantDataSourceConfig config = new TenantDataSourceConfig();  
            config.setHost("tenant-" + tenantId + ".db.example.com");  
            config.setPort(3306);  
            config.setDatabase("tenant\_" + tenantId);  
            config.setUsername("tenant\_" + tenantId);  
            config.setPassword("password\_" + tenantId);  
            return config;  
        });  
    }  
      
    private String buildJdbcUrl(TenantDataSourceConfig config) {  
        return String.format("jdbc:mysql://%s:%d/%s?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true",  
                config.getHost(), config.getPort(), config.getDatabase());  
    }  
      
    @Data  
    public static class TenantDataSourceConfig {  
        private String host;  
        private int port;  
        private String database;  
        private String username;  
        private String password;  
    }  
}  

数据源健康监控

  
/**  
 * 数据源健康监控器  
 */  
@Component  
@Slf4j  
public class DataSourceHealthMonitor {  
      
    @Autowired  
    private DynamicRoutingDataSource dynamicRoutingDataSource;  
      
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);  
      
    // 健康状态缓存  
    private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();  
      
    @PostConstruct  
    public void init() {  
        // 启动健康检查任务  
        scheduler.scheduleAtFixedRate(this::checkAllDataSources, 0, 30, TimeUnit.SECONDS);  
    }  
      
    /**  
     * 检查所有数据源的健康状态  
     */  
    public void checkAllDataSources() {  
        Map<Object, Object> dataSources = dynamicRoutingDataSource.getTargetDataSources();  
          
        for (Map.Entry<Object, Object> entry : dataSources.entrySet()) {  
            String dataSourceKey = (String) entry.getKey();  
            DataSource dataSource = (DataSource) entry.getValue();  
              
            boolean isHealthy = checkDataSourceHealth(dataSource);  
            healthStatus.put(dataSourceKey, isHealthy);  
              
            if (!isHealthy) {  
                log.warn("数据源 {} 健康检查失败", dataSourceKey);  
                // 可以发送告警通知  
            }  
        }  
    }  
      
    /**  
     * 检查单个数据源健康状态  
     */  
    private boolean checkDataSourceHealth(DataSource dataSource) {  
        try (Connection conn = dataSource.getConnection();  
             Statement stmt = conn.createStatement()) {  
              
            ResultSet rs = stmt.executeQuery("SELECT 1");  
            return rs.next() && rs.getInt(1) == 1;  
              
        } catch (SQLException e) {  
            log.error("数据源健康检查异常", e);  
            return false;  
        }  
    }  
      
    /**  
     * 获取数据源健康状态  
     */  
    public boolean isDataSourceHealthy(String dataSourceKey) {  
        return healthStatus.getOrDefault(dataSourceKey, true);  
    }  
      
    /**  
     * 获取健康的数据源列表  
     */  
    public List<String> getHealthyDataSources() {  
        return healthStatus.entrySet().stream()  
                .filter(Map.Entry::getValue)  
                .map(Map.Entry::getKey)  
                .collect(Collectors.toList());  
    }  
      
    @PreDestroy  
    public void destroy() {  
        scheduler.shutdown();  
    }  
}  

五、动态数据源的应用场景

让我们通过架构图来理解动态数据源的典型应用场景:

picture.image

六、优缺点

优点

  1. 灵活性高 :支持运行时动态添加、移除数据源
  2. 解耦性好 :业务代码与具体数据源解耦
  3. 扩展性强 :易于实现读写分离、多租户等复杂场景
  4. 维护方便 :数据源配置集中管理,便于维护

缺点

  1. 复杂度增加 :系统架构变得更加复杂
  2. 事务管理复杂 :跨数据源事务需要特殊处理
  3. 连接池开销 :每个数据源都需要独立的连接池
  4. 调试困难 :数据源切换增加了调试复杂度

七、生产环境注意事项

事务管理策略

  
/**  
 * 多数据源事务管理器  
 */  
@Component  
@Slf4j  
public class MultiDataSourceTransactionManager {  
      
    /**  
     * 在多个数据源上执行事务性操作  
     */  
    @Transactional(rollbackFor = Exception.class)  
    public void executeInTransaction(Runnable task, String... dataSources) {  
        if (dataSources.length == 1) {  
            // 单数据源事务  
            DataSourceContextHolder.setDataSource(dataSources[0]);  
            try {  
                task.run();  
            } finally {  
                DataSourceContextHolder.clear();  
            }  
        } else {  
            // 多数据源伪事务(最终一致性)  
            executeWithCompensation(task, dataSources);  
        }  
    }  
      
    /**  
     * 使用补偿机制实现多数据源"事务"  
     */  
    private void executeWithCompensation(Runnable task, String[] dataSources) {  
        List<Runnable> compensationTasks = new ArrayList<>();  
          
        try {  
            // 按顺序执行各个数据源的操作  
            for (String dataSource : dataSources) {  
                DataSourceContextHolder.setDataSource(dataSource);  
                try {  
                    // 执行实际业务操作  
                    task.run();  
                      
                    // 记录补偿操作  
                    compensationTasks.add(0, createCompensationTask(dataSource));  
                      
                } finally {  
                    DataSourceContextHolder.clear();  
                }  
            }  
        } catch (Exception e) {  
            // 执行补偿操作  
            log.error("多数据源操作失败,执行补偿操作", e);  
            executeCompensation(compensationTasks);  
            throw e;  
        }  
    }  
      
    private void executeCompensation(List<Runnable> compensationTasks) {  
        for (Runnable compensation : compensationTasks) {  
            try {  
                compensation.run();  
            } catch (Exception ex) {  
                log.error("补偿操作执行失败", ex);  
                // 记录补偿失败,需要人工介入  
            }  
        }  
    }  
}  

性能优化建议

  1. 连接池优化 :根据业务特点调整各数据源连接池参数
  2. 数据源预热 :应用启动时预热常用数据源
  3. 缓存策略 :缓存数据源配置和路由信息
  4. 监控告警 :建立完善的数据源监控体系

总结

动态数据源是一个强大的技术方案,能够很好地解决多数据源管理的复杂性。

通过本文的详细解析,我们可以看到:

  1. 核心原理 :基于 AbstractRoutingDataSourceThreadLocal 的上下文管理
  2. 实现方式 :注解+AOP的声明式数据源切换
  3. 高级特性 :读写分离、多租户、健康监控等生产级功能
  4. 适用场景 :多租户、读写分离、分库分表等复杂数据架构

在实际项目中,建议根据具体业务需求选择合适的实现方案,不要过度设计。

同时,要建立完善的监控和运维体系,确保动态数据源的稳定运行。

最后欢迎加入苏三的星球,你将获得:智能天气播报AI Agent、SaaS点餐系统(DDD+多租户)、100万QPS短链系统(超过并发)、复杂的商城微服务系统(分布式)、苏三商城系统、苏三AI项目、刷题吧小程序、秒杀系统、码猿简历网站、代码生成工具等10个项目的源代码、开发教程和技术答疑。 系统设计、性能优化、技术选型、底层原理、Spring源码解读、工作经验分享、痛点问题、面试八股文等多个优质专栏。

还有1V1免费修改简历、技术答疑、职业规划、送书活动、技术交流。

扫描下方二维码,可以加入星球:

picture.image

数量有限,先到先得。 目前星球已经更新了6100+篇优质内容,还在持续爆肝中.....

星球已经被官方推荐了3次,收到了小伙伴们的一致好评。戳我加入学习,已有2100+小伙伴加入学习。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
字节跳动 NoSQL 的实践与探索
随着 NoSQL 的蓬勃发展越来越多的数据存储在了 NoSQL 系统中,并且 NoSQL 和 RDBMS 的界限越来越模糊,各种不同的专用 NoSQL 系统不停涌现,各具特色,形态不一。本次主要分享字节跳动内部和火山引擎 NoSQL 的实践,希望能够给大家一定的启发。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论