项目实战 · 发布-订阅模式思考

社区
引言

发布-订阅模式(Publish-Subscribe Pattern)是一种软件架构设计模式,属于行为型设计模式,用于解耦生产者(发布者)和消费者(订阅者)之间的关系。发布者负责发布消息,而订阅者则负责订阅这些消息并对其进行处理。这种模式的优点在于它能够提高系统的可扩展性、灵活性和可维护性。

picture.image

由来

发布-订阅模式的由来可以追溯到20世纪80年代和90年代的Smalltalk语言,当时软件系统正变得越来越复杂,需要处理大量的分布式计算和实时数据处理任务。为了解决这些问题,学术界和工业界的研究者们开始探索更加灵活、可扩展的通信机制。

在这个背景下,发布-订阅模式作为一种异步、松耦合的通信模式应运而生。它的灵感来源于现实世界中的报纸杂志订阅系统,在这个系统中,出版商发布新的报纸或杂志,而订阅者可以根据自己的兴趣订阅相应的出版物。类似地,在发布-订阅模式中,发布者负责发布消息,而订阅者则根据自己的兴趣订阅相应的消息。

发布-订阅模式的提出,旨在解决传统通信模式(如请求-响应模式和点对点通信模式)中存在的一些问题,如紧耦合、难以扩展和维护等。通过将生产者和消费者解耦,发布-订阅模式实现了更高效、灵活的信息传递和处理方式。

自发布以来,发布-订阅模式得到了广泛的应用和发展。它被应用于各种软件系统中,如消息队列系统(如RabbitMQ、Kafka等)、事件驱动架构、实时数据流处理等领域。同时,发布-订阅模式也在不断发展和演变,出现了许多变种和扩展形式,以满足不同应用场景的需求。

picture.image

概念

发布-订阅模式是一种软件架构设计模式,它描述了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当主题对象状态发生改变时,它的所有依赖者(观察者)都会自动收到通知并更新。

在发布-订阅模式中,有两个主要的角色:发布者和订阅者。发布者负责发布消息,而订阅者则负责订阅这些消息并对其进行处理。这种模式的优点在于它能够提高系统的可扩展性、灵活性和可维护性。

发布-订阅模式的核心概念包括:

  1. 主题(Topic):主题是一个抽象的概念,代表了一类消息。订阅者可以订阅一个或多个主题,以便接收与这些主题相关的消息。
  2. 发布者(Publisher):发布者是负责发布消息的对象。它创建并发送消息到特定的主题,而不需要知道有哪些订阅者。
  3. 订阅者(Subscriber):订阅者是负责接收并处理消息的对象。它订阅一个或多个主题,并在接收到相关消息时执行相应的操作。
  4. 消息代理(Message Broker):消息代理是一个中介,负责接收发布者发布的消息,并根据订阅者的订阅信息将消息传递给相应的订阅者。消息代理还可以提供消息过滤、持久化存储等功能。
原理

发布-订阅模式的实现通常依赖于一个中间件,如消息队列或事件总线。这个中间件负责维护发布者和订阅者之间的关系,并在发布者发布新消息时,将消息分发给所有订阅了该事件的订阅者。

我有一个朋友张三酷爱去洗浴中心洗澡,但是他总是会错过洗浴中心的发布的最新消息,洗浴中心为了解决这个问题,挽留住大客户张三,就委托我给他们设计一个系统:

  1. 定义消息主题:首先,我们需要定义一个或多个消息主题,例如“洗澡服务”、“优惠活动”等。这些主题将用于区分不同类型的消息。
  2. 角色分配:在这个场景中,我们可以将张三视为订阅者,洗浴中心视为发布者,而消息代理可以是洗浴中心的后台管理系统。
  3. 订阅:张三(订阅者)向洗浴中心(消息代理)订阅感兴趣的主题,例如“洗澡服务”。这意味着张三希望接收到关于洗澡服务的最新信息。
  4. 发布:洗浴中心(发布者)根据业务需求发布消息。例如,当有新的优惠活动时,洗浴中心会向“优惠活动”主题发布一条消息,告知所有订阅了该主题的订阅者。
  5. 传递消息:洗浴中心的后台管理系统(消息代理)接收到发布者发布的消息后,根据订阅者订阅的主题,将消息传递给相应的订阅者。在这个例子中,张三订阅了“洗澡服务”主题,因此他会收到关于洗澡服务的最新消息。
  6. 处理消息:张三(订阅者)接收到消息后,根据自己的需求进行处理。例如,当他收到关于优惠活动的消息时,他可以选择是否参与活动,从而享受到优惠。
示例

我们将使用一个类作为消息代理,一个BathCenter类作为发布者,以及一个Customer类(代表张三)作为订阅者。

首先,我们创建一个Message类来表示消息:

public class Message {
    private String topic;
    private String content;

    public Message(String topic, String content) {
        this.topic = topic;
        this.content = content;
    }

    public String getTopic() {
        return topic;
    }

    public String getContent() {
        return content;
    }
}

接下来,我们创建MessageBroker类作为消息代理:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MessageBroker {
    private Map<String, List<Customer>> subscriptions = new HashMap<>();

    public void subscribe(String topic, Customer customer) {
        subscriptions.computeIfAbsent(topic, k -> new ArrayList<>()).add(customer);
    }

    public void unsubscribe(String topic, Customer customer) {
        subscriptions.getOrDefault(topic, new ArrayList<>()).remove(customer);
    }

    public void publish(String topic, String content) {
        List<Customer> customers = subscriptions.getOrDefault(topic, new ArrayList<>());
        for (Customer customer : customers) {
            customer.receiveMessage(new Message(topic, content));
        }
    }
}

然后,我们创建BathCenter类作为发布者:

public class BathCenter {
    private MessageBroker messageBroker;

    public BathCenter(MessageBroker messageBroker) {
        this.messageBroker = messageBroker;
    }

    public void offerDiscount(String discountInfo) {
        messageBroker.publish("discount", discountInfo);
    }
}

最后,我们创建Customer类(代表张三)作为订阅者:

public class Customer {
    private String name;

    public Customer(String name) {
        this.name = name;
    }

    public void subscribe(String topic, MessageBroker messageBroker) {
        messageBroker.subscribe(topic, this);
    }

    public void unsubscribe(String topic, MessageBroker messageBroker) {
        messageBroker.unsubscribe(topic, this);
    }

    public void receiveMessage(Message message) {
        System.out.println(name + " received message: " + message.getContent() + " for topic: " + message.getTopic());
    }
}

现在,我们可以创建一个简单的场景来演示张三订阅洗澡中心的消息:

public class Main {
    public static void main(String[] args) {
        MessageBroker messageBroker = new MessageBroker();
        BathCenter bathCenter = new BathCenter(messageBroker);
        Customer zhangSan = new Customer("张三");

        zhangSan.subscribe("discount", messageBroker);

        bathCenter.offerDiscount("今日特价:搓澡5折!有新到技师,盼速归~");
    }
}

运行这个程序,你将看到张三收到了洗澡中心的折扣信息。

Spring 实现

基于ApplicationEventApplicationListener实现发布-订阅模式:

首先,创建一个Spring Boot项目,添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

创建自定义事件CustomEvent

package com.neo.design.messagebroker;
import org.springframework.context.ApplicationEvent;

public class CustomEvent extends ApplicationEvent {

    private String message;

    public CustomEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}

创建消息发布者MessagePublisher

package com.neo.design.messagebroker;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class MessagePublisher implements ApplicationContextAware {

    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

    public void publishMessage(String message) {
        context.publishEvent(new CustomEvent(this, message));
    }
}

创建消息订阅者MessageSubscriber

package com.neo.design.messagebroker;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
public class MessageSubscriber implements ApplicationListener<CustomEvent> {

    @Override
    public void onApplicationEvent(CustomEvent event) {
        System.out.println("Subscriber received message: " + event.getMessage());
    }
}

创建一个简单的REST控制器,用于触发消息发布:

package com.neo.design.messagebroker;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ApiController {

    @Autowired
    private MessagePublisher publisher;

    @GetMapping("/publish")
    public String publishMessage(@RequestParam String message) {
        publisher.publishMessage(message);
        return "Message published: " + message;
    }
}

运行Spring Boot应用程序,并通过浏览器或Postman访问以下URL来测试发布-订阅模式:

http://localhost:9092/publish?message=今日新技师到店,盼速归~

picture.image

这样就实现了一个基于Spring Boot的简单发布-订阅模式的测试示例。

总结

作为一名程序员,我知发布-订阅模式(Publish-Subscribe Pattern)在软件设计中的重要性。这种模式以其独特的优势,为现代软件架构带来了诸多便利,特别是在异步通信、事件驱动架构以及组件解耦等方面。这种模式提高了系统的容错性和跨平台集成能力,同时也简化了大型系统的复杂性管理。尽管存在一些挑战,如消息顺序和调试难度,但总体来说,它对于构建现代软件系统具有显著优势。

0
0
0
0
关于作者
相关资源
DataSail CDC 数据整库实时入仓入湖实践
在线数据库数据导入到数仓分析的链路已经存在多年,随着近年来实时计算的发展,业务希望有延迟更低、运维更便捷、效率更高的CDC同步通道。本次分享主要介绍DataSail实现CDC整库实时同步的技术方案和业务实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论