异步的8种实现方案

大家好,我是苏三,又跟大家见面了。

前言

我们在做接口性能优化的时候,经常需要把同步改成异步。

那么你知道在Java中有哪些异步方案吗?

今天这篇文章就跟大家一起聊聊Java中的8种异步实现方案,希望对你会有所帮助。

最近建了一些工作内推群,各大城市都有,欢迎各位HR和找工作的小伙伴进群交流,群里目前已经收集了不少的工作内推岗位。

扫码加苏三的微信:li_su223,备注:所在城市,即可进群。

picture.image

1.为什么需要异步编程?

同步处理的致命陷阱 :当线程因I/O阻塞时,CPU资源被无效占用。

某电商大促期间,因支付服务响应从50ms恶化到2秒,订单服务的200个线程在10秒内全被阻塞,引发链式雪崩。

picture.image

异步编程的三大核心价值

  1. 资源释放 :I/O等待时释放线程,提升吞吐量(实测可达同步模式的3倍)
  2. 故障隔离 :单个服务异常不影响整体流程
  3. 流量削峰 :消息队列缓存突发流量

2.异步的8种实现方案

方案1:线程与线程池

核心原理 :物理线程实现并行

  
// 线程池最佳实践  
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); // Java 21+  
executor.submit(() -> {  
    System.out.println("异步任务执行: " + Thread.currentThread().getName());  
});  

线程状态机

picture.image

适用场景 :简单异步任务,资源消耗较大。

方案2:Future

核心痛点 :获取结果时需阻塞线程

  
ExecutorService executor = Executors.newFixedThreadPool(2);  
Future<String> future = executor.submit(() -> {  
    Thread.sleep(2000);  
    return "结果数据";  
});  
  
// 阻塞直到结果返回  
String result = future.get();   

致命缺陷

  1. 无法链式任务依赖
  2. 异常处理困难
  3. 无超时控制(需手动实现)

方案3:CompletableFuture

它是JDK8+的首选。

革命性突破 :非阻塞任务编排

  
CompletableFuture.supplyAsync(() -> fetchOrder(123))    // 阶段1:获取订单  
    .thenApplyAsync(order -> calculatePrice(order))     // 阶段2:计算价格  
    .thenAccept(price -> sendNotification(price))       // 阶段3:发送通知  
    .exceptionally(ex -> {                              // 统一异常处理  
        log.error("处理失败", ex);  
        return null;  
    });  

链式调用原理

picture.image

超时控制(JDK9+)

  
CompletableFuture.supplyAsync(() -> longTask())  
    .orTimeout(2, TimeUnit.SECONDS)  // 超时中断  
    .whenComplete((res, ex) -> {  
        if (ex instanceof TimeoutException) {  
            // 超时处理  
        }  
    });  

方案4:Spring @Async

它是企业级的简易方案。

最佳实践必须配置自定义线程池

  
@Configuration  
@EnableAsync  
publicclass AsyncConfig {  
    @Bean("taskExecutor")  
    public Executor taskExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(10);  
        executor.setMaxPoolSize(50);  
        executor.setQueueCapacity(100);  
        executor.setThreadNamePrefix("Async-");  
        return executor;  
    }  
}  
  
@Service  
publicclass OrderService {  
    @Async("taskExecutor")  
    public CompletableFuture<Order> createOrder(OrderDTO dto) {  
        // 异步创建逻辑  
        return CompletableFuture.completedFuture(order);  
    }  
}  

避坑指南

  1. 避免自调用(@Async失效)
  2. 线程池参数动态调整
  3. 监控队列堆积预警

方案5:Spring事件

它是解耦利器。

典型场景 :订单创建后的短信、积分等辅助操作

  
// 定义事件  
publicclass OrderCreatedEvent extends ApplicationEvent {  
    private Order order;  
    public OrderCreatedEvent(Object source, Order order) {  
        super(source);  
        this.order = order;  
    }  
}  
  
// 发布事件  
applicationContext.publishEvent(new OrderCreatedEvent(this, order));  
  
// 监听处理  
@Component  
publicclass BonusServiceListener {  
    @Async// 异步处理  
    @EventListener  
    public void handleOrderEvent(OrderCreatedEvent event) {  
        addBonus(event.getOrder().getUserId());  
    }  
}  

方案6:消息队列

它可以做分布式解耦。

架构设计

picture.image

RocketMQ示例

  
// 生产者  
Message msg = new Message("OrderTopic", "CREATE", orderJson.getBytes());  
producer.send(msg);  
  
// 消费者  
consumer.subscribe("OrderTopic", "*", (msgs, context) -> {  
    for (MessageExt msg : msgs) {  
        processOrder(new String(msg.getBody()));  
    }  
    return ConsumeConcurrentlyStatus.CONSUME\_SUCCESS;  
});  

可靠性保障

  1. 事务消息(防丢失)
  2. 死信队列(防积压)
  3. 幂等消费(防重复)

方案7:响应式编程

它是高并发的巅峰。

Project Reactor核心模式

  
Flux.range(1, 100)  
    .parallel() // 并行处理  
    .runOn(Schedulers.parallel())  
    .map(i -> intensiveCalculation(i))  
    .subscribe(result -> updateDB(result));  

背压机制

picture.image

适用场景 :实时数据流处理(如股票行情推送)。

方案8:异步HTTP与非阻塞IO

Vert.x实战

  
vertx.createHttpServer()  
    .requestHandler(req -> {  
        // 非阻塞处理  
        dbClient.query("SELECT * FROM users", res -> {  
            req.response()  
               .putHeader("content-type", "application/json")  
               .end(encodeJson(res.result()));  
        });  
    })  
    .listen(8080);  

与传统BIO对比

| 指标 | 阻塞IO | 非阻塞IO | | --- | --- | --- | | 线程数 | 1000请求=1000线程 | 1000请求=4线程 | | CPU利用率 | 低(上下文切换) | 高(事件驱动) | | 吞吐量 | < 5000 QPS |

30000 QPS |

3.常见问题

问题1:回调地狱(Callback Hell)

传统写法

  
serviceA.call(resultA -> {  
    serviceB.call(resultA, resultB -> {  
        serviceC.call(resultB, resultC -> {  
            // 嵌套地狱!  
        });  
    });  
});  

CompletableFuture解法

  
CompletableFuture.supplyAsync(serviceA::call)  
    .thenCompose(serviceB::call)  
    .thenCompose(serviceC::call)  
    .thenAccept(this::finalAction);  

问题2:上下文丢失

解决方案 :TransmittableThreadLocal

  
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();  
  
context.set("user123");  
CompletableFuture.runAsync(() -> {  
    System.out.println(context.get()); // 输出user123  
}, TtlExecutors.getTtlExecutorService(executor));  

问题3:分布式事务一致性

Saga模式实现

picture.image

4.性能压测对比

方案延迟(ms)吞吐量(QPS)线程数适用场景
线程池
45
2,000
200+
简单任务
Future
40
2,500
200+
需结果阻塞
CompletableFuture
25
8,000
50
复杂编排
@Async
30
7,000
50
Spring生态
消息队列
60
12,000
20
分布式解耦
响应式编程
15
15,000
4
高并发流处理
非阻塞IO
10
30,000
4
网络密集型服务

测试环境:AWS c5.4xlarge 16核32GB

5.异步编程的黄金法则

5.1 如何选型?

picture.image

5.2 避坑指南

  • 死锁预防 :避免异步任务间循环依赖
  • 超时控制 :所有异步操作必须设置超时
  • 幂等设计 :消息重试可能导致重复消费
  • 上下文传递 :异步时丢失ThreadLocal的解决方案:
  
// 使用TransmittableThreadLocal  
try (Scope scope = context.wrap(task).bind()) {  
    asyncTask.execute();  
}  

5.3 监控体系

  1. 线程池指标 :活跃线程数、队列深度、拒绝次数
  2. 消息队列 :积压量、消费延迟
  3. 链路追踪 :异步调用链可视化

总结

  1. 初创期@Async
  • 线程池
  1. 发展期 :CompletableFuture任务编排
  2. 高并发期 :响应式编程 + 非阻塞IO
  3. 分布式期 :消息队列 + 事务最终一致性

异步编程如同城市高架系统—— 同步阻塞是地面道路,一辆事故就全局瘫痪; 异步非阻塞是立体交通,局部故障不影响全局通行。

没有最好的方案,只有最适合场景的设计。

最后欢迎加入苏三的星球,你将获得:100万QPS短链系统、复杂的商城微服务系统、苏三AI项目、刷题吧小程序、秒杀系统、商城系统、秒杀系统、代码生成工具等8个项目的源代码、开发教程和技术答疑。

系统设计、性能优化、技术选型、底层原理、Spring源码解读、工作经验分享、痛点问题、面试八股文等多个优质专栏。

还有1V1免费修改简历、技术答疑、职业规划、送书活动、技术交流。

扫描下方二维码,可以优惠30元:

picture.image

只有20张优惠券,数量有限,先到先得。

目前星球已经更新了5800+篇优质内容,还在持续爆肝中.....

星球已经被官方推荐了3次,收到了小伙伴们的一致好评。戳我加入学习,已有1800+小伙伴加入学习。

0
0
0
0
评论
未登录
暂无评论