作者:京东科技 刘子洋

布景

最近项目出现同一音讯发送屡次的现象,对下流事务方造成困扰,经过排查发现运用EventBus办法不正确。也借此机会学习了下EventBus并进行共享。以下为共享内容,本文首要分为五个部分,篇幅较长,望大家耐性阅读。

  • 1、简述:简单介绍EventBus及其组成部分。

  • 2、原理解析:首要对listener注册流程及Event发布流程进行解析。

  • 3、运用辅导:EventBus简单的运用辅导。

  • 4、留意事项:在运用EventBus中需求留意的一些隐藏逻辑。

  • 5、共享时提问的问题

  • 6、项目中遇到的问题:上述问题进行详细描述并复现场景。

1、简述

1.1、概念

下文摘自EventBus源码注释,从注释中能够直观了解到他的功用、特性、留意事项

【源码注释】

Dispatches events to listeners, and provides ways for listeners to register themselves.

The EventBus allows publish-subscribe-style communication between components without requiring the components to explicitly register with one another (and thus be aware of each other). It is designed exclusively to replace traditional Java in-process event distribution using explicit registration. It is not a general-purpose publish-subscribe system, nor is it intended for interprocess communication.

Receiving Events

To receive events, an object should:

  • Expose a public method, known as the event subscriber, which accepts a single argument of the type of event desired;

  • Mark it with a Subscribe annotation;

  • Pass itself to an EventBus instance’s register(Object) method.

Posting Events

To post an event, simply provide the event object to the post(Object) method. The EventBus instance will determine the type of event and route it to all registered listeners.

Events are routed based on their type — an event will be delivered to any subscriber for any type to which the event is assignable. This includes implemented interfaces, all superclasses, and all interfaces implemented by superclasses.

When post is called, all registered subscribers for an event are run in sequence, so subscribers should be reasonably quick. If an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (For a convenient way to do this, use an AsyncEventBus.)

Subscriber Methods

Event subscriber methods must accept only one argument: the event.

Subscribers should not, in general, throw. If they do, the EventBus will catch and log the exception. This is rarely the right solution for error handling and should not be relied upon; it is intended solely to help find problems during development.

The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the AllowConcurrentEvents annotation. If this annotation is not present, subscriber methods need not worry about being reentrant, unless also called from outside the EventBus.

Dead Events

If an event is posted, but no registered subscribers can accept it, it is considered “dead.” To give the system a second chance to handle dead events, they are wrapped in an instance of DeadEvent and reposted.

If a subscriber for a supertype of all events (such as Object) is registered, no event will ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent extends Object, a subscriber registered to receive any Object will never receive a DeadEvent.

This class is safe for concurrent use.

See the Guava User Guide article on EventBus.
Since:
10.0
Author:
Cliff Biffle

1.2、体系流程

一文读懂Guava EventBus(订阅\发布事件)

1.3、组成部分

一文读懂Guava EventBus(订阅\发布事件)

1.3.1、调度器

EventBus、AsyncEventBus都是一个调度的人物,区别是一个同步一个异步。

  • EventBus
源码注释:
> Dispatches events to listeners, and provides ways for listeners to register themselves.
意思是说EventBus分发事情(Event)给listeners处理,而且提供listeners注册自己的办法。从这里咱们能够看出EventBus首要是一个调度的人物。
**EventBus总结**
- 1.同步履行,事情发送方在宣布事情之后,会等待一切的事情消费方履行完毕后,才会回来继续履行自己后边的代码。
- 2.事情发送方和事情消费方会在同一个线程中履行,消费方的履行线程取决于发送方。
- 3.同一个事情的多个订阅者,在接收到事情的次序上面有不同。谁先注册到EventBus的,谁先履行,假如是在同一个类中的两个订阅者一同被注册到EventBus的状况,收到事情的次序跟办法名有关。
  • AsyncEventBus
源码注释:
> An {@link EventBus} that takes the Executor of your choice and uses it to dispatch events, allowing dispatch to occur asynchronously.
意思是说AsyncEventBus便是EventBus,只不过AsyncEventBus运用你指定的线程池(不指定运用默许线程池)去分发事情(Event),而且是异步进行的。
**AsyncEventBus总结**
- 1.异步履行,事情发送方异步宣布事情,不会等待事情消费方是否收到,直接履行自己后边的代码。
- 2.在界说AsyncEventBus时,结构函数中会传入一个线程池。事情消费方收到异步事情时,消费方会从线程池中获取一个新的线程来履行自己的使命。
- 3.同一个事情的多个订阅者,它们的注册次序跟接收到事情的次序上没有任何联系,都会同时收到事情,而且都是在新的线程中,**异步并发**的履行自己的使命。

1.3.2、事情承载器

  • Event
事情主体,用于承载音讯。
  • DeadEvent
 源码注释:
>Wraps an event that was posted, but which had no subscribers and thus could not be delivered, Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system's event distribution.
意思是说DeadEvent便是一个被包装的event,只不过是一个没有订阅者无法被分发的event。咱们能够在开发时注册一个DeadEvent,由于它能够检测体系事情分布中的错误装备。

1.3.3、事情注册中心

SubscriberRegistry

 源码注释:
>  Registry of subscribers to a single event bus.
意思是说SubscriberRegistry是单个事情总线(EventBus)的订阅者注册表。

1.3.4、事情分发器

Dispatcher

源码注释:
>Handler for dispatching events to subscribers, providing different event ordering guarantees that make sense for different situations.
>Note: The dispatcher is orthogonal to the subscriber's Executor. The dispatcher controls the order in which events are dispatched, while the executor controls how (i.e. on which thread) the subscriber is actually called when an event is dispatched to it.
意思是说Dispatcher首要使命是将事情分发到订阅者,而且能够不同的状况,按不同的次序分发。

Dispatcher有三个子类,用以满意不同的分发状况

1.PerThreadQueuedDispatcher

源码注释:
> Returns a dispatcher that queues events that are posted reentrantly on a thread that is already dispatching an event, guaranteeing that all events posted on a single thread are dispatched to all subscribers in the order they are posted.
> When all subscribers are dispatched to using a direct executor (which dispatches on the same thread that posts the event), this yields a breadth-first dispatch order on each thread. That is, all subscribers to a single event A will be called before any subscribers to any events B and C that are posted to the event bus by the subscribers to A.
意思是说一个线程在处理事情过程中又发布了一个事情,PerThreadQueuedDispatcher会将后边这个事情放到最后,然后确保在单个线程上发布的一切事情都按其发布次序分发给订阅者。**留意,每个线程都要自己存储事情的行列。**
第二段是说PerThreadQueuedDispatcher按**广度优先**分发事情。并给了一个比如:
代码中发布了事情A,订阅者收到后,在履行过程中又发布了事情B和事情C,PerThreadQueuedDispatcher会确保事情A分发给一切订阅者后,再分发B、C事情。

2.LegacyAsyncDispatcher

源码注释:
> Returns a dispatcher that queues events that are posted in a single global queue. This behavior matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful. For async dispatch, an immediate dispatcher should generally be preferable.
意思是说LegacyAsyncDispatcher有一个大局行列用于寄存一切事情,LegacyAsyncDispatcher特性与AsyncEventBus特性彻底相符,除此之外没有其他什么特性。假如异步分发的话,最好用immediate dispatcher。

3.ImmediateDispatcher

源码注释:
> Returns a dispatcher that dispatches events to subscribers immediately as they're posted without using an intermediate queue to change the dispatch order. This is effectively a depth-first dispatch order, vs. breadth-first when using a queue.
意思是说ImmediateDispatcher在发布事情时立行将事情分发给订阅者,而不运用中心行列更改分发次序。这实践上是**深度优先**的调度次序,而不是运用行列时的**广度优先**。

1.3.4、订阅者

  • Subscriber
源码注释:
> A subscriber method on a specific object, plus the executor that should be used for dispatching events to it.
Two subscribers are equivalent when they refer to the same method on the same object (not class). This property is used to ensure that no subscriber method is registered more than once.
第一段意思是说,Subscriber是特定目标(Event)的订阅办法,用于履行被分发事情。第二段说当两个订阅者在同一目标 **(不是类)** 上引证相同的办法时,它们是等效的,此属性用于确保不会屡次注册任何订阅者办法,首要说明会对订阅者进行判重,假如是同一个目标的同一个办法,则认为是同一个订阅者,不会进行重复注册。
  • SynchronizedSubscriber
源码注释:
> Subscriber that synchronizes invocations of a method to ensure that only one thread may enter the method at a time.
意思是说同步办法调用以确保一次只要一个线程能够履行订阅者办法(线程安全)。

2、原理解析

2.1、主体流程

  1. listener 经过EventBus进行注册。

  2. SubscriberRegister 会依据listener、listener中含有【@Subscribe】注解的办法及各办法参数创立Subscriber 目标,并将其维护在Subscribers(ConcurrentMap类型,key为event类目标,value为subscriber调集)中。

  3. publisher发布事情Event。

  4. 发布Event后,EventBus会从SubscriberRegister中查找出一切订阅此事情的Subscriber,然后让Dispatcher分发Event到每一个Subscriber。

流程如下:

一文读懂Guava EventBus(订阅\发布事件)

2.2、listener注册原理

2.2.1、listener注册流程

  1. 缓存一切含有@Subscribe注解办法到subscriberMethodsCache(LoadingCache<Class<?>, ImmutableList>, key为listener,value为method调集)。

  2. listener注册。

一文读懂Guava EventBus(订阅\发布事件)

2.2.2、原理剖析

  • 获取含有@Subscribe注释的办法进行缓存
    找到一切被【@Subscribe】润饰的办法,并进行缓存
    留意!!!这两个办法被static润饰,类加载的时分就进行寻觅
    一文读懂Guava EventBus(订阅\发布事件)

    一文读懂Guava EventBus(订阅\发布事件)

订阅者唯一标识是【办法名+入参】

一文读懂Guava EventBus(订阅\发布事件)

  • 注册订阅者
    1.注册办法
    一文读懂Guava EventBus(订阅\发布事件)

    创立Subscriber时,假如method含有【@AllowConcurrentEvents】注释,则创立SynchronizedSubscriber,否则创立Subscriber
    一文读懂Guava EventBus(订阅\发布事件)

    2、获取一切订阅者
    一文读懂Guava EventBus(订阅\发布事件)

    3、从缓存中获取一切订阅办法
    一文读懂Guava EventBus(订阅\发布事件)

2.3、Event发布原理

2.3.1、发布主体流程

  • publisher 发布事情Event。

  • EventBus 依据Event 类目标从SubscriberRegistry中获取一切订阅者。

  • 将Event 和eventSubscribers 交由Dispatcher去分发。

  • Dispatcher 将Event 分发给每个Subscribers。

  • Subscriber 利用反射履行订阅者办法。

图中画出了三个Dispatcher的分发原理。

一文读懂Guava EventBus(订阅\发布事件)

2.3.2、原理剖析

  • 创立缓存
    缓存EventMsg一切超类
    留意!!!此处是静态办法,因而在代码加载的时分就会缓存Event一切超类。

    一文读懂Guava EventBus(订阅\发布事件)

  • 发布Event事情
    此办法是发布事情时调用的办法。

    一文读懂Guava EventBus(订阅\发布事件)

  • 获取一切订阅者
    1、从缓存中获取一切订阅者

    一文读懂Guava EventBus(订阅\发布事件)

    2、获取Event超类
    一文读懂Guava EventBus(订阅\发布事件)

  • 事情分发
    1、分发进口

    一文读懂Guava EventBus(订阅\发布事件)

    2、分发器分发
    2.1、ImmediateDispatcher
    来了一个事情则告诉对这个事情感兴趣的订阅者。
    一文读懂Guava EventBus(订阅\发布事件)

    2.2、PerThreadQueuedDispatcher(EventBus默许选项)
    在同一个线程post的Event履行次序是有序的。用ThreadLocal queue来完结每个线程的Event有序性,在把事情增加到queue后会有一个ThreadLocal dispatching来判别当时线程是否正在分发,假如正在分发,则这次增加的event不会立刻进行分发而是比及dispatching的值为false(分发完结)才进行。
    源码如下:
    一文读懂Guava EventBus(订阅\发布事件)

    一文读懂Guava EventBus(订阅\发布事件)

    2.3、LegacyAsyncDispatcher(AsyncEventBus默许选项)
    会有一个大局的行列ConcurrentLinkedQueue queue保存EventWithSubscriber(事情和subscriber),假如被不同的线程poll,不能确保在queue行列中的event是有序发布的。源码如下:
    一文读懂Guava EventBus(订阅\发布事件)

    一文读懂Guava EventBus(订阅\发布事件)

  • 履行订阅者办法
    办法进口是dispatchEvent,源码如下:

    一文读懂Guava EventBus(订阅\发布事件)

    由于Subscriber有两种,因而履行办法也有两种:
    1.Subscriber(非线程安全)
    一文读懂Guava EventBus(订阅\发布事件)

    2.SynchronizedSubscriber(线程安全)
    留意!!!履行办法会加同步锁
    一文读懂Guava EventBus(订阅\发布事件)

3、运用辅导

3.1、首要流程

一文读懂Guava EventBus(订阅\发布事件)

3.2、流程详解

  • 1、创立EventBus、AsyncEventBus Bean
    在项目中统一装备大局单例Bean(如特别需求,可装备多例)

    一文读懂Guava EventBus(订阅\发布事件)

  • 2、界说EventMsg
    设置音讯载体。

    一文读懂Guava EventBus(订阅\发布事件)

  • 3、注册Listener
    注册Listener,处理事情

    一文读懂Guava EventBus(订阅\发布事件)

    留意! 在运用 PostConstruct注释进行注册时,需求留意子类会履行父类含有PostConstruct 注释的办法。

  • 3、事情发布
    封装统一发布事情的Bean,然后经过Bean注入到需求发布的Bean里边进行事情发布。

    一文读懂Guava EventBus(订阅\发布事件)

    一文读懂Guava EventBus(订阅\发布事件)

此处对EventBus进行了统一封装收口操作,首要考虑的是假如做一些操作,直接改这一处就能够。假如不需求封装,能够在运用的当地直接注入EventBus即可。

4、留意事项

4.1、循环分发事情

假如事务流程较长,牢记梳理好事务流程,不要让事情循环分发。
现在EventBus没有对循环事情进行处理。

4.2、运用 @PostConstrucrt 注册listener

子类在履行实例化时,会履行父类@PostConstrucrt 注释。 假如listenerSon承继listenerFather,当两者都运用@PostConstrucrt注册订阅办法时,子类也会调用父类的注册办法进行注册订阅办法。由于EventBus机制,子类注册订阅办法时,也会注册父类的监听办法

一文读懂Guava EventBus(订阅\发布事件)

Subscriber唯一标志是(listener+method),因而在对同一办法注册时,由于不是同一个listener,所以关于EventBus是两个订阅办法。
一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

因而,假如存在listenerSon、listenerFather两个listener,且listenerSon承继listenerFather。当都运用@PostConstrucrt注册时,会导致listenerFather里边的订阅办法注册两次。

4.3、含有承继联系的listener

当注册listener含有承继联系时,listener处理Event音讯时,listener的父类也会处理该音讯。

4.3.1、承继联系的订阅者

一文读懂Guava EventBus(订阅\发布事件)

4.3.2、原理

子类listener注册,父类listener也会注册

一文读懂Guava EventBus(订阅\发布事件)

4.4、含有承继联系的Event

假如作为参数的Event有承继联系,运用EventBus发布Event时,Event父类的监听者也会对Event进行处理。

4.4.1、履行结果

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

4.4.2、原理

在分发音讯的时分,会获取一切订阅者数据(Event订阅者和Event超类的订阅者),然后进行分发数据。
获取订阅者数据如下图:

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

缓存Event及其超类的类目标,key为Event类目标。

一文读懂Guava EventBus(订阅\发布事件)

5、共享提问问题

问题1:PerThreadQueuedDispatcherd 里边的行列,是否是有界行列?

有界行列,最大值为 int 的最大值 (2147483647),源码如下图:

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

问题2:dispatcher 分发给订阅者是否有序?

EventBus:同步事情总线
同一个事情的多个订阅者,在接收到事情的次序上面有不同。谁先注册到EventBus的,谁先履行(由于base运用的是PostConstruct进行注册,因而跟不同Bean之间的初始化次序有联系)。假如是在同一个类中的两个订阅者一同被注册到EventBus的状况,收到事情的次序跟办法名有关。

AsyncEventBus:异步事情总线:同一个事情的多个订阅者,它们的注册次序跟接收到事情的次序上没有任何联系,都会同时收到事情,而且都是在新的线程中,异步并发的履行自己的使命。

问题3:EventBus与SpringEvent的对比?

  • 运用办法比较
项目 事情 发布者 发布办法 是否异步 监听者 注册办法
EventBus 恣意目标 EventBus EventBus#post 支撑同步异步 注解Subscribe办法 手动注册EventBus#register
SpringEvent 恣意目标 ApplicationEventPublisher ApplicationEventPublisher#publishEvent 支撑同步异步 注解EventListener办法 体系注册
  • 运用场景比较
项目 事情区别 是否支撑事情簇 是否支撑自界说event 是否支撑过滤 是否支撑事情阻隔 是否支撑事务 是否支撑设置订阅者消费次序 杂乱程度
EventBus Class 简单
Spring Event Class 杂乱

参考链接www.cnblogs.com/shoren/p/ev…

问题4:EventBus的运用场景,结合现有运用场景考虑是否适宜?

EventBus暂时不适用,首要有一下几个点:

  • EventBus不支撑事务,项目在更新、创立产品时,最好等事务提交成功后,再发送MQ音讯(首要问题点)

  • EventBus不支撑设置同一音讯的订阅者消费次序。

  • EventBus不支撑音讯过滤。SpringEvent支撑音讯过滤

6.项目中遇到的问题

6.1、问题描述

产品上架时会触发渠道分发功用,会有两步操作

  • 1、创立一条分发记载,并对外发送一条未分发状况的产品改变音讯(经过eventBus 事情发送音讯)。

  • 2、将分发记载改为审阅中(需求审阅)或审阅经过(不需求审阅),并对外发送一条已分发状况的产品改变音讯(经过eventBus 事情发送音讯)。

所以分发会触发两条分发状况不同的产品改变音讯,一条是未分发,另一条是已分发。实践发送了两条分发状况相同的产品改变音讯,状况都是已分发

6.2、原因

咱们先来回顾下EventBus 监听者处理事情时有三种战略,这是根本原因:

  • ImmediateDispatcher:来一个事情立刻进行处理。

  • PerThreadQueuedDispatcher(eventBus默许选项,项目中运用此战略):在同一个线程post的Event,履行的次序是有序的。用ThreadLocal queue来完结每个线程post的Event是有序的,在把事情增加到queue后会有一个ThreadLocal dispatching来判别当时线程是否正在分发,假如正在分发,则这次增加的event不会立刻进行分发而是比及dispatching的值为false才进行。

  • LegacyAsyncDispatcher(AsyncEventBus默许选项):会有一个大局的行列ConcurrentLinkedQueue queue保存EventWithSubscriber(事情和subscriber),假如被不同的线程poll 不能确保在queue行列中的event是有序发布的。

概况可见上文中的【2.3.4、事情分发】

再看下项目中的逻辑:

产品主动分发在产品改变的Listener里操作。
由于当时分发操作处于产品上架事情处理过程中,因而关于增加分发记载事情不会立马处理,而是将其放入行列。
上架操作完结,分发状况变为已分发。
等上架操作完结后,产品改变Listener处理分发事情(此刻有两条EventMsg,一个是增加分发记载另一个是修正分发状况),分发状况实时查询,关于第一个分发事情,查询到的分发记载是已分发状况。
最终导致两条音讯都是已分发状况。

6.3、场景复现

在handler中对静态变量进行两次+1 操作,每操作一步发送一条事情,此处假设静态变量为分发状况。

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

6.4、解决办法

现在 Dispatcher 包用default 润饰,运用者无法指定Dispatcher 战略。而且 ImmediateDispatcher 运用private润饰。

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

一文读懂Guava EventBus(订阅\发布事件)

因而现在暂无解决非同步问题,只能在事务逻辑上进行躲避。

其实能够修正源码并发布一个包自己运用,但是公司安全规定不允许这样做,只能经过事务逻辑上进行躲避,下图是github上对此问题的讨论。

一文读懂Guava EventBus(订阅\发布事件)

7、总结

假如项目中需求运用异步解耦处理一些事项,运用EventBus仍是比较便利的。