我们好,我是老三,在项目里,常常会有一些主线业务之外的其它业务,比如,下单之后,发送告诉、监控埋点、记载日志……

这些非中心业务,如果悉数一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。

SpringBoot项目实现发布订阅模式,真的很简单!

所以,一般在开发的时分,都会把这些操作抽象成观察者形式,也就是发布/订阅形式(这儿就不讨论观察者形式和发布/订阅形式的不同),并且一般会选用多线程的办法来异步履行这些观察者办法。

SpringBoot项目实现发布订阅模式,真的很简单!

一开始,咱们都是自己去写观察者形式。

自己完成观察者形式

SpringBoot项目实现发布订阅模式,真的很简单!

观察者

  • 观察者界说接口
/**
 * @Author: fighter3
 * @Description: 观察者接口
 * @Date: 2022/11/7 11:40 下午
 */
public interface OrderObserver {
    void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}
  • 详细观察者

    • 监控埋点观察者
    @Slf4j
    public class OrderMetricsObserver implements OrderObserver {
        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] metrics");
        }
    }
    
    • 日志记载观察者
    @Slf4j
    public class OrderLogObserver implements OrderObserver{
        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    
    • 业务告诉观察者
    @Slf4j
    public class OrderNotifyObserver implements OrderObserver{
        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] notify.");
        }
    }
    

被观察者

  • 音讯实体界说
@Data
public class PlaceOrderMessage implements Serializable {
    /**
     * 订单号
     */
    private String orderId;
    /**
     * 订单状态
     */
    private Integer orderStatus;
    /**
     * 下单用户ID
     */
    private String userId;
    //……
}
  • 被观察者抽象类
public abstract class OrderSubject {
    //界说一个观察者列表
    private List<OrderObserver> orderObserverList = new ArrayList<>();
    //界说一个线程池,这儿参数随意写的
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
    //添加一个观察者
    public void addObserver(OrderObserver o) {
        this.orderObserverList.add(o);
    }
    //删去一个观察者
    public void delObserver(OrderObserver o) {
        this.orderObserverList.remove(o);
    }
    //告诉一切观察者
    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
        for (OrderObserver orderObserver : orderObserverList) {
            //运用多线程异步履行
            threadPoolExecutor.execute(() -> {
                orderObserver.afterPlaceOrder(placeOrderMessage);
            });
        }
    }
}

这儿运用了多线程,来异步履行观察者。

  • 被观察者完成类
/**
 * @Author: fighter3
 * @Description: 订单完成类-被观察者完成类
 * @Date: 2022/11/7 11:52 下午
 */
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
    /**
     * 下单
     */
    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //添加观察者
        this.addObserver(new OrderMetricsObserver());
        this.addObserver(new OrderLogObserver());
        this.addObserver(new OrderNotifyObserver());
        //告诉观察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}

测验

    @Test
    @DisplayName("下单")
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }
  • 测验履行成果
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver        : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618  INFO 20235 --- [           main] cn.fighter3.obverser.OrderServiceImpl    : [placeOrder] end.
2022-11-08 00:11:13.618  INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver  : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617  INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver    : [afterPlaceOrder] log.

能够看到,观察者是异步履行的。

运用Spring精简

能够看到,观察者形式写起来仍是比较简略的,可是既然都用到了Spring来管理Bean的生命周期,代码还能够更精简一些。

SpringBoot项目实现发布订阅模式,真的很简单!

观察者完成类:界说成Bean

  • OrderLogObserver

    @Slf4j
    @Service
    public class OrderLogObserver implements OrderObserver {
        @Override
        public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    
  • OrderMetricsObserver

@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] metrics");
    }
}
  • OrderNotifyObserver
@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
        log.info("[afterPlaceOrder] notify.");
    }
}

被观察者:自动注入Bean

  • OrderSubject

    public abstract class OrderSubject {
        /**
         * 运用Spring的特性直接注入观察者
         */
        @Autowired
        protected List<OrderObserver> orderObserverList;
        //界说一个线程池,这儿参数随意写的
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
        //告诉一切观察者
        public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
            for (OrderObserver orderObserver : orderObserverList) {
                //运用多线程异步履行
                threadPoolExecutor.execute(() -> {
                    orderObserver.afterPlaceOrder(placeOrderMessage);
                });
            }
        }
    }
    
  • OrderServiceImpl

@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
    /**
     * 完成类里也要注入一下
     */
    @Autowired
    private List<OrderObserver> orderObserverList;
    /**
     * 下单
     */
    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
        PlaceOrderResVO resVO = new PlaceOrderResVO();
        //告诉观察者
        this.notifyObservers(new PlaceOrderMessage());
        log.info("[placeOrder] end.");
        return resVO;
    }
}

这样一来,发现被观察者又简练了许多,可是后来我发现,在SpringBoot项目里,运用Spring事情驱动驱动模型(event)模型来完成,愈加地简练。

Spring Event完成发布/订阅形式

Spring Event对发布/订阅形式进行了封装,运用起来愈加简略,仍是以咱们这个场景为例,看看怎样来完成吧。

自界说事情

  • PlaceOrderEvent:承继ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的一切应用程序事情扩展类。
public class PlaceOrderEvent extends ApplicationEvent {
    public PlaceOrderEvent(PlaceOrderEventMessage source) {
        super(source);
    }
}
  • PlaceOrderEventMessage:事情音讯,界说了事情的音讯体。
@Data
public class PlaceOrderEventMessage implements Serializable {
    /**
     * 订单号
     */
    private String orderId;
    /**
     * 订单状态
     */
    private Integer orderStatus;
    /**
     * 下单用户ID
     */
    private String userId;
    //……
}

事情监听者

事情监听者,有两种完成办法,一种是完成ApplicationListener接口,另一种是运用@EventListener注解。

SpringBoot项目实现发布订阅模式,真的很简单!

完成ApplicationListener接口

完成ApplicationListener接口,重写onApplicationEvent办法,将类界说为Bean,这样,一个监听者就完成了。

  • OrderLogListener
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {
    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}
  • OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {
    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] metrics");
    }
}
  • OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {
    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] notify.");
    }
}

运用@EventListener注解

运用@EventListener注解就更简略了,直接在办法上,加上@EventListener注解就行了。

  • OrderLogListener

    @Slf4j
    @Service
    public class OrderLogListener  {
        @EventListener
        public void orderLog(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    
  • OrderMetricsListener

    @Slf4j
    @Service
    public class OrderMetricsListener {
        @EventListener
        public void metrics(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] metrics");
        }
    }
    
  • OrderNotifyListener

    @Slf4j
    @Service
    public class OrderNotifyListener{
        @EventListener
        public void notify(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] notify.");
        }
    }
    

异步和自界说线程池

异步履行

异步履行也十分简略,运用Spring的异步注解@Async就能够了。例如:

  • OrderLogListener
@Slf4j
@Service
public class OrderLogListener  {
    @EventListener
    @Async
    public void orderLog(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}

当然,还需要敞开异步,SpringBoot项目默许是没有敞开异步的,咱们需要手动装备敞开异步功用,很简略,只需要在装备类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步办法履行功用,需要和@Configuration注解一起运用,也能够直接加在启动类上。

@SpringBootApplication
@EnableAsync
public class DailyApplication {
    public static void main(String[] args) {
        SpringApplication.run(DairlyLearnApplication.class, args);
    }
}

自界说线程池

运用@Async的时分,一般都会自界说线程池,由于@Async的默许线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默许每次调用都会创立一个新的线程。

自界说线程池有三种办法:

SpringBoot项目实现发布订阅模式,真的很简单!

  • 完成接口AsyncConfigurer
  • 承继AsyncConfigurerSupport
  • 装备由自界说的TaskExecutor替代内置的使命履行器

咱们来看看三种写法:

  • 完成接口AsyncConfigurer
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {
    @Bean("fighter3AsyncExecutor")
    public ThreadPoolTaskExecutor executor() {
        //Spring封装的一个线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //随意写的一些装备
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(30);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("fighter3AsyncExecutor-");
        executor.initialize();
        return executor;
    }
    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}
  • 承继AsyncConfigurerSupport
@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {
    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //随意写的一些装备
        threadPool.setCorePoolSize(10);
        threadPool.setMaxPoolSize(30);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        return threadPool;
    }
    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
    }
}
  • 装备自界说的TaskExecutor

    • 装备线程池

      @Configuration
      public class TaskPoolConfig {
          @Bean(name = "asyncExecutor")
          public Executor taskExecutor() {
              ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
              //随意写的一些装备
              executor.setCorePoolSize(10);
              executor.setMaxPoolSize(20);
              executor.setQueueCapacity(200);
              executor.setKeepAliveSeconds(60);
              executor.setThreadNamePrefix("asyncExecutor-");
              executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
              return executor;
          }
      }
      
    • 运用@Async注解的时分,指定线程池,推荐运用这种办法,由于在项目里,尽量做到线程池阻隔,不同的使命运用不同的线程池

    @Slf4j
    @Service
    public class OrderLogListener  {
        @EventListener
        @Async("asyncExecutor")
        public void orderLog(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    

异步和自界说线程池这一部分只是一些扩展,稍微占了一些篇幅,我们可不要觉得Spring Event用起来很繁琐。

SpringBoot项目实现发布订阅模式,真的很简单!

发布事情

发布事情也十分简略,只需要运用Spring 提供的ApplicationEventPublisher来发布自界说事情。

  • OrderServiceImpl

    @Service
    @Slf4j
    public class OrderServiceImpl implements OrderService {
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
        /**
         * 下单
         */
        @Override
        public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
            log.info("[placeOrder] start.");
            PlaceOrderResVO resVO = new PlaceOrderResVO();
            //音讯
            PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
            //发布事情
            applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
            log.info("[placeOrder] end.");
            return resVO;
        }
    }
    

在Idea里检查事情的监听者也比较方便,点击下面图中的图标,就能够检查监听者。

SpringBoot项目实现发布订阅模式,真的很简单!

SpringBoot项目实现发布订阅模式,真的很简单!

测验

最终,咱们仍是测验一下。

    @Test
    void placeOrder() {
        PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
        orderService.placeOrder(placeOrderReqVO);
    }
  • 履行成果
2022-11-08 10:05:14.415  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] start.
2022-11-08 10:05:14.424  INFO 22674 --- [           main] c.f.o.event.event.OrderServiceImpl       : [placeOrder] end.
2022-11-08 10:05:14.434  INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener    : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435  INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener   : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436  INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener       : [afterPlaceOrder] log.

能够看到,异步履行,并且用到了咱们自界说的线程池。

小结

这篇文章里,从最开始自己完成的观察者形式,再到运用Spring简化的观察者形式,再到运用Spring Event完成发布/订阅形式,能够看到,Spring Event用起来仍是比较简略的。除此之外,还有Guava EventBus这样的事情驱动完成,我们更习惯运用哪种呢?




参考:

[1]. 《设计形式之禅》