SpringBoot 时间轮实现延时任务

算法微服务数据库
传统方案的困境

在日常开发中,我们经常需要处理各种定时任务:用户注册后的欢迎邮件、订单超时自动取消、缓存定期刷新等。传统的定时器方案在面对大规模定时任务时往往力不从心:

性能瓶颈日益凸显

  • ScheduledExecutor 在处理上千个任务时性能急剧下降
  • Timer 类不仅线程不安全,还存在单点故障风险
  • • 每次调度都要在堆中查找最小元素,时间复杂度O(log n)
  • • 频繁的GC压力导致系统吞吐量受限

业务需求日益复杂

  • • 消息重试需要指数退避策略
  • • 分布式系统需要精确的延迟调度
  • • 会话管理需要动态添加删除任务
  • • 限流器需要高效的时间窗口控制
时间轮的诞生

picture.image

时间轮(Timing Wheel) 灵感来源于我们日常使用的时钟。想象一下老式机械表的工作原理

  • • 表盘被分成12个小时刻度,时针每12小时转一圈
  • • 分针每分钟移动一格,60分钟转一圈
  • • 秒针每秒移动一格,60秒转一圈

时间轮借鉴了这个思想,将时间划分成固定的槽位,通过指针的移动来调度任务。这种巧妙的设计将时间维度空间化,用简单的指针移动代替了复杂的堆操作。

核心设计哲学

时间离散化 :将连续时间分割成等长的tick间隔

空间映射 :每个时间间隔对应一个槽位

批量触发 :同一槽位的任务一起执行

轮次计数 :多圈任务通过轮数计算

为什么时间轮如此高效?

时间轮的巧妙之处在于它彻底改变了任务调度的思路:

时间复杂度的革命 从O(log n)到O(1)

  • • 传统方案:需要在堆中查找最小元素
  • • 时间轮:直接计算目标槽位,一次到位

内存访问的优化

  • • 顺序访问数组元素,CPU缓存命中率高
  • • 相比堆结构的随机访问,性能提升明显

批量处理的威力

  • • 同一槽位的多个任务批量触发
  • • 减少线程切换和调度开销
  • • 提高系统的整体吞吐量
算法设计:从时钟模型到数据结构

时间轮的工作原理

想象一个真实的时钟

  • • 时钟有12个小时刻度,每个刻度代表一个小时
  • • 秒针每秒移动一格,分针每分钟移动一格,时针每小时移动一格
  • • 我们可以通过指针的位置确定当前时间

时间轮采用了类似的概念

  • • 将时间划分为N个槽位(通常是2的幂次,如512个)
  • • 每个槽位代表一个固定的时间间隔(如100ms)
  • • 一个指针周期性地移动到下一个槽位
  • • 当指针到达某个槽位时,执行该槽位中的所有任务

核心数据结构设计

时间轮主体结构


 
 
 
 
   
时间轮数组:  
[Slot0][Slot1][Slot2]...[Slot511]  
  ↑       ↑       ↑         ↑  
 指针    100ms200ms51.1秒后

槽位(Slot)设计

  • • 使用 ConcurrentLinkedQueue 存储任务列表,保证线程安全
  • • 维护任务计数器,便于快速统计
  • • 记录最后访问时间,用于监控和清理

任务包装器(TimerTaskWrapper)

  • • 包装原始任务和相关信息
  • • 记录任务的到期时间和需要经过的轮数
  • • 维护任务状态(等待、运行、完成、失败、取消)
  • • 提供任务进度和剩余时间计算

算法关键步骤

任务调度算法

    1. 计算任务需要经过的tick数量: ticks = delayMs / tickDuration
    1. 计算目标槽位: targetSlot = (currentSlot + ticks) % slotSize
    1. 计算需要经过的轮数: rounds = ticks / slotSize
    1. 将任务包装后放入目标槽位

指针移动算法

    1. 周期性地将指针移动到下一个槽位
    1. 处理当前槽位中的所有任务
    1. 对于未到期的任务,轮数减1后重新入槽
    1. 对于到期的任务,提交到工作线程池执行

多轮任务处理

  • • 当任务延迟时间超过一圈时,需要记录剩余轮数
  • • 每次指针经过时,如果轮数>0,则轮数减1
  • • 只有轮数为0的任务才会被执行
核心实现:高性能调度引擎

线程模型设计

双线程池架构 是时间轮高性能的关键


 
 
 
 
   
调度线程池(单线程):  
    ↓  
周期性移动指针  
    ↓  
处理槽位任务  
    ↓  
提交到工作线程池  
  
工作线程池(多线程):  
    ↓  
并发执行任务  
    ↓  
处理业务逻辑  
    ↓  
更新任务状态

调度线程池

  • • 使用单线程避免并发问题
  • • 负责指针移动和槽位处理
  • • 通过 scheduleAtFixedRate 实现周期性调度
  • • 设置为守护线程,不阻止JVM退出
SpringBoot的完整实现

服务层

TimingWheelService设计


 
 
 
 
   
@Service  
public class TimingWheelService {  
    // 任务管理  
    public String scheduleTask(TimerTask task, long delayMs);  
    public boolean cancelTask(String taskId);  
    public List<TimerTaskWrapper> getActiveTasks();  
  
    // 统计信息  
    public TimingWheelStats getStats();  
    public TaskExecutionStats getExecutionStats();  
  
    // 任务清理  
    public int cleanupCompletedTasks();  
  
    // 示例任务  
    public String createSampleTask(String type, long delayMs);  
    public List<String> createBatchTasks(int count, long minDelay, long maxDelay);  
}

核心配置类


 
 
 
 
   
/**  
 * 时间轮配置类  
 */  
@Configuration  
@EnableConfigurationProperties(TimingWheelProperties.class)  
public class TimingWheelConfig {  
  
    @Bean  
    public TimingWheel timingWheel(TimingWheelProperties properties, MeterRegistry meterRegistry) {  
        log.info("Creating timing wheel with properties: {}", properties);  
        return new TimingWheel(properties, meterRegistry);  
    }  
  
    @Bean  
    public MetricsConfig metricsConfig(MeterRegistry meterRegistry) {  
        return new MetricsConfig(meterRegistry);  
    }  
  
    @Bean  
    public WebConfig webConfig() {  
        return new WebConfig();  
    }  
}

REST API接口


 
 
 
 
   
/**  
 * 时间轮控制器  
 */  
@RestController  
@RequestMapping("/api/timingwheel")  
@CrossOrigin(origins = "*")  
public class TimingWheelController {  
  
    @Autowired  
    private TimingWheelService timingWheelService;  
  
    /**  
     * 获取时间轮统计信息  
     */  
    @GetMapping("/stats")  
    public ResponseEntity<TimingWheel.TimingWheelStats> getStats() {  
        TimingWheel.TimingWheelStats stats = timingWheelService.getStats();  
        return ResponseEntity.ok(stats);  
    }  
  
    /**  
     * 创建示例任务  
     */  
    @PostMapping("/tasks/sample")  
    public ResponseEntity<Map<String, Object>> createSampleTask(@RequestBody Map<String, Object> request) {  
        String type = (String) request.getOrDefault("type", "simple");  
        long delay = ((Number) request.getOrDefault("delay", 1000)).longValue();  
  
        String taskId = timingWheelService.createSampleTask(type, delay);  
  
        Map<String, Object> response = new HashMap<>();  
        response.put("taskId", taskId);  
        response.put("type", type);  
        response.put("delay", delay);  
        response.put("message", "Task created successfully");  
  
        return ResponseEntity.ok(response);  
    }  
  
    /**  
     * 批量创建任务  
     */  
    @PostMapping("/tasks/batch")  
    public ResponseEntity<Map<String, Object>> createBatchTasks(@RequestBody Map<String, Object> request) {  
        int count = (Integer) request.getOrDefault("count", 10);  
        long minDelay = ((Number) request.getOrDefault("minDelay", 1000)).longValue();  
        long maxDelay = ((Number) request.getOrDefault("maxDelay", 10000)).longValue();  
  
        List<String> taskIds = timingWheelService.createBatchTasks(count, minDelay, maxDelay);  
  
        Map<String, Object> response = new HashMap<>();  
        response.put("taskIds", taskIds);  
        response.put("count", taskIds.size());  
        response.put("message", "Batch tasks created successfully");  
  
        return ResponseEntity.ok(response);  
    }  
  
    /**  
     * 压力测试  
     */  
    @PostMapping("/stress-test")  
    public ResponseEntity<Map<String, Object>> stressTest(@RequestBody Map<String, Object> request) {  
        int taskCount = (Integer) request.getOrDefault("taskCount", 1000);  
        long minDelay = ((Number) request.getOrDefault("minDelay", 100)).longValue();  
        long maxDelay = ((Number) request.getOrDefault("maxDelay", 5000)).longValue();  
  
        long startTime = System.currentTimeMillis();  
        List<String> taskIds = timingWheelService.createBatchTasks(taskCount, minDelay, maxDelay);  
        long endTime = System.currentTimeMillis();  
  
        Map<String, Object> response = new HashMap<>();  
        response.put("taskCount", taskIds.size());  
        response.put("creationTime", endTime - startTime);  
        response.put("throughput", taskIds.size() * 1000.0 / (endTime - startTime));  
        response.put("message", "Stress test completed successfully");  
  
        return ResponseEntity.ok(response);  
    }  
}

应用配置


 
 
 
 
   
# application.yml  
server:  
  port: 8081  
  servlet:  
    context-path: /  
  
spring:  
  application:  
    name: springboot-timingwheel  
  
# Timing Wheel Configuration  
timingwheel:  
  config:  
    slot-size: 512  
    tick-duration: 100  
    worker-threads: 4  
    enable-multi-wheel: true  
    enable-metrics: true  
    task-timeout: 30000  
  
logging:  
  level:  
    com.example.timingwheel: DEBUG  
    org.springframework.web: INFO  
  pattern:  
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
总结

本文展示了时间轮的设计与实现,从算法原理到可视化监控。时间轮通过巧妙的时间维度空间化思想,用简单的指针移动实现了高效的定时任务调度,在高并发场景下展现出卓越的性能优势。

最后欢迎加入苏三的星球,你将获得:智能天气播报AI Agent、SaaS点餐系统(DDD+多租户)、100万QPS短链系统(超过并发)、复杂的商城微服务系统(分布式)、苏三AI项目、刷题吧小程序、秒杀系统、商城系统、码猿简历网站、代码生成工具等10个项目的源代码、开发教程和技术答疑。 系统设计、性能优化、技术选型、底层原理、Spring源码解读、工作经验分享、痛点问题、面试八股文等多个优质专栏。

还有1V1免费修改简历、技术答疑、职业规划、送书活动、技术交流。

扫描下方二维码,可以加入星球:

picture.image

数量有限,先到先得。 目前星球已经更新了6100+篇优质内容,还在持续爆肝中.....

星球已经被官方推荐了3次,收到了小伙伴们的一致好评。戳我加入学习,已有2000+小伙伴加入学习。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
vivo 容器化平台架构与核心能力建设实践
为了实现规模化降本提效的目标,vivo 确定了基于云原生理念构建容器化生态的目标。在容器化生态发展过程中,平台架构不断演进,并针对业务的痛点和诉求,持续完善容器化能力矩阵。本次演讲将会介绍 vivo 容器化平台及主要子系统的架构设计,并分享重点建设的容器化核心能力。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论