Android-EventBus修正纪实

持续创作,加速成长!这是我参与「日新计划 6 月更文挑战」的第 3 天,点击检查活动概况


背景

笔者在运用 EventBus 的过程中发现有时只能收到最终一次的粘性 Event ,导致事务逻辑出现混乱,下面是笔者的运用示例:

// Event.java
public final class Event {
    private final int code;
    public Event(int code) {
        this.code = code;
    }
    public int getCode() {
        return code;
    }
}
// Example.java
public class Example {
    // 调用屡次
    public void test(int code) {
        EventBus.getDefault().postSticky(new Event(code));
    }
    // 调用屡次 `test(int code)` 后再注册订阅者
    public void register() {
        EventBus.getDefault().register(this);
    }
    @Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void receiveEvent(Event event) {
        // 发现只能收到最终一次的粘性工作
        System.out.println(event.getCode());
    }
}

所以去检查了 EventBus 的源码,接下来咱们剖析下 EventBus 发送粘性工作的流程。

剖析

粘性工作

以下源码依据 EventBus 3.3.1 版本

下面是发送粘性工作的源码:

private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

postSticky 代码比较简单,首要对 stickyEvents 进行加锁,接下来把 event 工作的 Class 目标作为 Key,event 工作本身作为 value 放进 Map 中,其间stickyEvents 是 Map 目标,实例是 ConcurrentHashMap, 其 Key 和 Value 的泛型形参分别是 Class<?>Object, 它的效果便是用来存储粘性工作;然后调用 post(event) 把粘性工作当作一般工作发送一下。

首要咱们看下最终为什么要调用下 post(event)

虽然 post(evnet) 上面有注释,简单翻译下:”在放进 Map 后应该再发送一次,以防止订阅者想当即删去此工作”,读完注释后,或许仍是不太理解,这儿笔者以为:在前面存储完粘性工作后,这儿调用 post 把粘性工作当作一般工作发送出去,或许是因为现在现已有注册的粘性工作订阅者,此刻把现已注册的粘性工作订阅者当作一般工作的订阅者,这样现已注册的粘性工作订阅者能够当即收到相应的工作,仅仅此刻工作不再是粘性的。

postSticky 中咱们并没有看到粘性工作是在哪里发送的,想一想咱们运用粘性工作的目的是什么?当注册订阅者时能够收到之前发送的工作,这样来看,粘性工作的发送是在注册订阅者时,下面是注册订阅者的源码,删去了一些无关代码:

public void register(Object subscriber) {
    // 省掉无关代码
    Class<?> subscriberClass = subscriber.getClass();
    // 查找订阅者一切的Event接纳办法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

register 代码也比较简单,首要经过订阅者的 Class 目标查找订阅者一切的Event工作接纳办法,然后对 EventBus 目标加锁,遍历一切的Event工作接纳办法 subscriberMethods 调用 subscribe 办法,以下是 subscribe 办法源码:

// Key 为 Event Class 目标,Value 为存储 Event 的订阅者和接纳 Event 办法目标的调集 
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// Key 为订阅者目标,Value 为订阅者中的 Event Class目标调集
private final Map<Object, List<Class<?>>> typesBySubscriber;
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // Event Class目标
    Class<?> eventType = subscriberMethod.eventType;
    // 订阅者和接纳 Event 办法目标
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    // 依据 Event Class目标,获取订阅者和接纳 Event 办法目标的调集
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    // 判别订阅者和接纳 Event 办法目标是否为空
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        // 判别是否现已包括了新的订阅者和接纳 Event 办法目标,若是包括则以为是重复注册
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                                        + eventType);
        }
    }
    // 这儿是按优先级排序刺进到调集中
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    // 这儿是把 Event Class目标添加进对应订阅者的 Event Class目标调集中
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    // 上面现已判别了是否重复注册,所以这儿直接添加
    subscribedEvents.add(eventType);
    // 接下来便是粘性工作的发送逻辑了
    // 判别 Event 接纳办法是否能够处理粘性工作
    if (subscriberMethod.sticky) {
        // 这儿判别是否考虑 Event 工作类的承继关系,默以为 Ture
        if (eventInheritance) {
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

在上面的源码中,添加了不少注释有助于咱们读懂源码,在源码的最终便是粘性工作的发送逻辑了,其间有两个分支,其间一个分支依据 Event 工作的承继关系发送工作,别的一个分支依据接纳 Event 办法中的 Event Class 目标从 stickyEvents 中直接查找粘性工作,最终两个分支殊途同归,都调用了 checkPostStickyEventToSubscription 办法:

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    if (stickyEvent != null) {
        // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
        // --> Strange corner case, which we don't take care of here.
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

checkPostStickyEventToSubscription 办法很简单,对粘性工作做下判空处理,持续调用 postToSubscription 办法,传入订阅者与接纳 Event 办法目标,粘性工作和是否是主线程布尔值:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;A
                case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

postToSubscription 办法比较长,但是比较好理解,便是依据接纳 Event 办法上的 @Subscribe 注解中传入的线程模型进行工作的分发,详细的工作分发流程,有空再剖析,本文就先不剖析了,现在咱们只需知道最终都会调用 invokeSubscriber(Subscription subscription, Object event) 办法即可:

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        // 反射调用 Event 接纳办法传入 Event 工作
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

总算在 invokeSubscriber 办法中找到调用 Event 接纳办法的当地了,原来 EventBus 最终是经过反射调用 Event 接纳办法并传入相应 Event 工作的。

剖析完 Event 工作的发送流程,如同没有发现为什么有时收不到粘性工作。

咱们回过头来再看下笔者的运用示例,为了方便检查,下面贴出运用示例代码:

// Example.java
public class Example {
    // 调用屡次
    public void test(int code) {
        EventBus.getDefault().postSticky(new Event(code));
    }
    // 调用屡次 `test(int code)` 后再注册订阅者
    public void register() {
        EventBus.getDefault().register(this);
    }
    @Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
    public void receiveEvent(Event event) {
        // 发现只能收到最终一次的粘性工作
        System.out.println(event.getCode());
    }
}

或许仔细的读者现已发现 test 办法调用了,问题应该出在 postSticky 办法中,让咱们再次检查 postSticky 办法:

private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

依据前面剖析 postSticky 办法的结果,stickyEvents 用于存储粘性工作,它是个 Map 结构,而 stickyEvents 的 Key 正是 Event 的 Class 目标,依据 Map 结构的存储原理:如果存在相同的 Key,则掩盖 Value 的值,而 stickyEvents 的 Value 正是 Event 本身。

总算真相大白,屡次调用 test 办法发送粘性工作,EventBus 只会存储最终一次的粘性工作。

小结

EventBus 针对同一个粘性 Event 工作只会存储最终一次发送的粘性工作。

EventBus 的上述完成或许是因为屡次发送同一个粘性工作,则以为之前的工作是过期工作应该抛弃,因而只传递最新的粘性工作。

EventBus 的这种完成无法满足笔者的事务逻辑需求,笔者期望屡次发送的粘性工作,订阅者都能接纳到,而不是只接纳最新的粘性工作,能够理解为粘性工作必达订阅者,下面让咱们修正 EventBus 的源码来满足需求吧。

修正

上一节咱们剖析了粘性工作的发送流程,为了满足粘性工作必达的需求,依据现有粘性工作流程,咱们能够模仿粘性工作的发送来提供一个发送必达音讯的办法。

Subscribe

首要咱们界说 Event 接纳办法能够接纳粘性工作是在 @Subscribesticky = true , 所以咱们能够修正 Subscribe 注解,添加粘性工作必达的办法:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;
    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;
    // 添加音讯必达的办法
    boolean rendezvous() default false;
    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

rendezvous 以为约会、约定的意思,能够理解为不见不散,在这儿它有两层效果,其一是符号办法能够接纳粘性工作,其二是符号办法接纳的工作是必达的。

findSubscriberMethods

接下来就需要解析 rendezvous 了,咱们先看看 sticky 是怎么解析的,在上一节咱们剖析了 register 办法,方便检查,下面再贴出 register 办法源码:

public void register(Object subscriber) {
    // 省掉无关代码
    Class<?> subscriberClass = subscriber.getClass();
    // 查找订阅者一切的Event接纳办法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

上一节剖析中,咱们没有剖析查找订阅者中一切的 Event 接纳办法 findSubscriberMethods ,接下来咱们剖析下在 findSubscriberMethods 办法是怎么查找 Event 接纳办法的:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    // 先从缓存中查找
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }
    // 是否疏忽生成索引,默以为False,所以这儿走else分支
    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        // 查找Event接纳办法
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    // 如果订阅者和订阅者父类中没有Event接纳办规律抛出异常
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                                    + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        // 添加进缓存中
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

调用 findSubscriberMethods 办法需要传入订阅者 Class 目标,经过笔者在源码中添加的注释剖析发现默认调用 findUsingInfo 办法查找 Event 接纳办法,咱们持续跟踪 findUsingInfo 办法:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    // FindState对订阅者Class目标和Event接纳办法进行了一层封装
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass); // ①
    while (findState.clazz != null) {
        // 查找订阅者信息,包括订阅者Class目标、 订阅者父类、Event接纳办法等
        findState.subscriberInfo = getSubscriberInfo(findState); // ②
        // 在 ① initForSubscriber中会把subscriberInfo置为null,
        // 在 ② getSubscriberInfo中没有Index目标,
        // 所以第一次时这儿会走else分支
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            // 查找Event接纳办法
            findUsingReflectionInSingleClass(findState);
        }
        // 查找父类的Event接纳办法
        findState.moveToSuperclass();
    }
    // 经过findState返回Event接纳办法,并回收findState
    return getMethodsAndRelease(findState);
}

依据笔者在源码中的注释剖析,在 findUsingInfo 办法中运用「享元形式」对 FindState 进行回收运用,防止创立很多暂时的 FindState 目标占用内存,最终再次调用 findUsingReflectionInSingleClass 办法查找 Event 接纳办法,看办法名字应该是运用反射查找,findUsingReflectionInSingleClass 源码较长,删减一些不关心的代码:

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        // 经过反射获取当时类中声明的一切办法
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // 删减不关心的代码
    }
    // 遍历一切办法
    for (Method method : methods) {
        // 获取办法的修饰符
        int modifiers = method.getModifiers();
        // 判别办法是否是public的;是否是抽象办法,是否是静态办法,是否是桥接办法,是否是组成办法
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            // 获取办法的形参Class目标数组
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                // 获取办法上的Subscribe注解
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    // 检测是否现已添加了相同签名的办法,考虑子类复写父类办法的状况
                    if (findState.checkAdd(method, eventType)) {
                        // 获取注解的参数
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
								subscribeAnnotation.priority(), subscribeAnnotation.sticky(),
								// 这儿咱们添加rendezvous参数 ①
								subscribeAnnotation.rendezvous()));
                    }
                }
            }
            // 删减不关心的代码
        }
        // 删减不关心的代码
    }
}

findUsingReflectionInSingleClass 办法中经过反射获取订阅者中声明的一切办法,然后遍历一切办法:

  1. 首要判别办法的修饰符是否契合,
  2. 其次判别办法是否只要一个形参,
  3. 再次判别办法是否有 Subscribe 注解,
  4. 然后检测是否现已添加了相同签名的办法,主要是考虑子类复写父类办法这种状况,
  5. 最终获取 Subscribe 注解的参数,在这儿咱们解析 rendezvous,封装进 SubscriberMethod 中。

SubscriberMethod 中添加 rendezvous 字段,删去不关心的代码:

public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
    // 添加 `rendezvous` 字段
    final boolean rendezvous;
    /** Used for efficient comparison */
    String methodString;
    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, 
                            int priority, boolean sticky,
                            // 添加 `rendezvous` 形参
                            boolean rendezvous) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
        this.rendezvous = rendezvous;
    }
}

postRendezvous

好的,rendezvous 现已解析出来了,接下来咱们对外提供发送必达工作的接口:

// 选择List存储必达工作,运用Pair封装必达工作的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;
public void postRendezvous(Object event) {
    synchronized (rendezvousEvents) {
        rendezvousEvents.add(Pair.create(event.getClass(), event));
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

上面的源码,咱们经过模仿 postSticky 办法完成了 postRendezvous 办法,在 postSticky 办法中运用 Map 存储粘性工作,不过咱们在 postRendezvous 办法中运用 List 存储必达工作,确保必达工作不会因为 Key 相同而被掩盖丢掉,最终也是调用 post 办法测验先发送一次必达工作。

register

在上一节中咱们剖析了粘性工作是在 register 中调用 subscribe 办法进行发送的,这儿咱们模仿粘性工作的发送逻辑,完成必达工作的发送逻辑,咱们能够在 subscribe 办法最终添加发送必达工作的逻辑,以下源码省掉了一些不关心的代码:

private final List<Pair<Class<?>, Object>> rendezvousEvents;
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // 省掉不关心的代码
    // 粘性工作发送逻辑
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
    // 新增必达工作发送逻辑
    // 判别办法是否能够接纳必达工作
    if (subscriberMethod.rendezvous) {
        if (eventInheritance) {
            for (Pair<Class<?>, Object> next : rendezvousEvents) {
                Class<?> candidateEventType = next.first;
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = next.second;
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object rendezvousEvent = getRendezvousEvent(eventType);
            if (rendezvousEvent != null) {
                checkPostStickyEventToSubscription(newSubscription, rendezvousEvent);
            }
        }
    }
}

subscribe 办法中,咱们经过模仿粘性工作的发送逻辑添加了必达工作的发送:

  1. 首要判别 Event 接纳办法是否能够接纳必达工作
  2. 其次考虑 Event 必达工作的承继关系,
  3. 最终两个分支都调用 checkPostStickyEventToSubscription 办法发送必达工作

happy~

总结

运用第三方库时,发现问题不要慌张,带着问题去检查源码总有一番收成,这也告诫咱们在运用第三库时最好先搞理解它的完成原理,遇到问题时不至于束手无策。

经过剖析 EventBus 的源码,咱们有以下收成:

  1. 理解了咱们注册订阅者时 EventBus 做了哪些工作
  2. 知晓了咱们发送粘性工作时,EventBus 是怎么处理及何时发送粘性工作的
  3. 了解到 EventBus 是经过反射调用 Event 工作的接纳办法
  4. 学习了 EventBus 中的一些优化点,比如对 FindState 运用「享元形式」防止创立很多暂时目标占用内存
  5. 进一步了解到对并发的处理

经过以上收成,咱们成功修正 EventBus 源码完成了咱们必达工作的需求。

到这儿咱们现已完成了必达工作的发送,不过咱们还剩下获取必达工作,移除必达工作没有完成,最终 EventBus 中还有单元测试 module,咱们还没有针对 rendezvous 编写单元测试,读者有爱好的话,能够自己试着完成。

期望能够帮到你~