简介

EventBus

EventBus 对于 Android 程序员来说应该不是很生疏,它是基于观察者形式的事情发布/订阅结构,咱们常常用它来完成不同组件的通讯,后台线程通讯等。

EventBus源码分析(一)

  • 简化了组件之间的通讯
  • 别离事情发送方和接收方
  • 在活动、片段和后台线程中体现杰出
  • 避免复杂和简略犯错的依靠联系和生命周期问题
  • 使你的代码更简略
  • 快速
  • 很小(约60k)
  • 已被装置量超越10亿的运用所证明
  • 具有高档功能,如交付线程,订阅者优先级等。

运用办法

EventBus in 3 steps

  1. Define events:

    public static class MessageEvent { /* Additional fields if needed */ }
    
  2. Prepare subscribers: Declare and annotate your subscribing method, optionally specify a thread mode:

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEvent(MessageEvent event) {
        // Do something
    }
    

    Register and unregister your subscriber. For example on Android, activities and fragments should usually register according to their life cycle:

     @Override
     public void onStart() {
         super.onStart();
         EventBus.getDefault().register(this);
     }
     @Override
     public void onStop() {
         super.onStop();
         EventBus.getDefault().unregister(this);
     }
    
  3. Post events:

     EventBus.getDefault().post(new MessageEvent());
    

Read the full getting started guide.

There are also some examples.

Note: we highly recommend the EventBus annotation processor with its subscriber index. This will avoid some reflection related problems seen in the wild.

Android projects:

implementation("org.greenrobot:eventbus:3.3.1")

Java projects:

implementation("org.greenrobot:eventbus-java:3.3.1")
<dependency>
    <groupId>org.greenrobot</groupId>
    <artifactId>eventbus-java</artifactId>
    <version>3.3.1</version>
</dependency>

大约流程

在了解 EventBus 内部原理之前,咱们先了解一下 EventBus 结构的一个大约流程。如下图所示:

EventBus源码分析(一)

上图中绿色为订阅流程,赤色为发送事情流程,大家能够结合上图,来了解源码

在上图中咱们在A.java中订阅了事情AEvent,在B.java中订阅了事情AEventBEvent,下面咱们来剖析 EventBus 中注册与事情发送的两个流程,在介绍两个流程之前,先介绍一下SubscriptionSubscriberMethod中所包括的内容。

Subscription类中包括以下内容:

  • 当时注册目标
  • 对应订阅办法的封装目标 SubscriberMethod

SubscriberMethod类中包括以下内容:

  • 包括@Subscribe注解的办法的 Method (java.lang.reflect包下的目标)。
  • @Subscribe注解中设置的线程形式 ThreadMode
  • 办法的注册的事情类型的 Class 目标
  • @Subscribe中设置的优先级 priority
  • @Subscribe中设置事情是否是粘性事情 sticky

注册流程

当咱们经过调用 EventBus.register() 注册 A、B 两个目标时,EventBus 会做以下几件事情:

  • 经过内部的SubscriberMethodFinder来获取 A、B类中含有@Subscribe注解的办法,并将该注解中的内容与对应办法封装为SubscriberMethod目标。然后再将当时订阅目标与对应的SubscriberMethod再封装为Subscription目标。
  • 将一切的Subscription放在名为subscriptionsByEventType类型为Map<Class<?>, CopyOnWriteArrayList<Subscription>>数据结构(key 为事情类型的 Class 目标) 中,由于Subscription目标内部包括SubscriberMethod, 那么就能知道订阅的事情类型,所以咱们能够依据事情类型来区别Subscription,又由于相同事情能够被不同订阅者中的办法来订阅,所以相同类型的事情也就以对应不同的Subscription
  • 将订阅者中的一切订阅的事情都封装在名为typesBySubscriber类型为Map<Object, List<Class<?>>>数据结构(key 为订阅目标,value 为该目标订阅的事情类型 Class 目标)。该调集首要用于撤销订阅,鄙人文中咱们会进行介绍。

在整个注册流程中,最首要的流程便是 EventBus 经过SubscriberMethodFinder去获取类中包括@Subscribe注解的订阅办法。在 EventBus 3.0 之前该流程一直都是经过反射的办法去获取。在 3.0 及今后版别,EventBus 采用了 APT 技能,对SubscriberMethodFinder查找订阅办法流程进行了优化,使其能在EventBus.register()办法调用之前就能知道相关订阅事情的办法,这样就减少了程序在运转期间运用反射遍历获取办法所带来的时刻耗费。鄙人文中咱们也会指出详细的优化点。

事情发送流程

知道了 EventBus 的注册进程,再来了解事情的发送流程就十分简略了。由于咱们现现已过subscriptionsByEventType存储事情对应的Subscription,只需找到了Subscription,那么咱们就能从Subscription拿到订阅事情的目标subscriber,以及对应的订阅办法 Method (java.lang.reflect包下的目标)。然后经过反射调用:

Subscription 内部包括订阅者及 SubscriberMethod(内部包括订阅办法 Method )

 method.invoke(subscription.subscriber, event)

经过上述办法,就能将对应事情发送到相关订阅者了。当然这儿只是简略的介绍了事情是怎么发送到相关订阅者的。关于 EventBus 中粘性事情的处理,线程怎么切换。会鄙人文中进行详细介绍。

源码剖析

在了解了 EventBus 的内部大约流程后,现在咱们经过源码来更深层次的了解其内部完成。仍是从订阅进程与事情的发送两个进程进行解说。

订阅进程源码剖析

EventBus 的订阅进口为 register() 办法,如下所示:

  public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        //流程1:获取对应类中一切的订阅办法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            //流程2:实践订阅
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

在该办法中,首要涉及到 SubscriberMethodFinder 查找办法与实践订阅两个流程,下面咱们会对这两个流程进行介绍。

SubscriberMethodFinder 查找办法流程

在该流程中,首要经过SubscriberMethodFinder去获取订阅者中一切的 SubscriberMethod ,咱们先看findSubscriberMethods()办法:

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //从缓存中获取订阅者中的订阅办法,假如有则读缓存,假如没有进行查找
        List<SubscriberMethod> subscriberMethods = (List)METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        } else {
            if (this.ignoreGeneratedIndex) {//假如疏忽索引类,则运用反射。
                subscriberMethods = this.findUsingReflection(subscriberClass);
            } else {//否则运用索引类
                subscriberMethods = this.findUsingInfo(subscriberClass);
            }
            //假如订阅者没有订阅办法,则抛出反常
            if (subscriberMethods.isEmpty()) {
                throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation”);
            } else {
                //将对应类中的订阅办法,增加到缓存中,进步功率,便利下次查找
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
                return subscriberMethods;
            }
        }
    }

该办法的逻辑也十分简略,为如下几个过程:

  • 过程1:先从缓存(METHOD_CACHE)中获取订阅者对应的SubscriberMethod(订阅办法),假如有则从缓存中取。
  • 过程2:假如缓存中没有,则经过布尔变量ignoreGeneratedIndex,来判别是直接运用反射获取订阅办法,仍是经过索引类(EventBus 3.0 运用APT 增加的类)来获取。由于ignoreGeneratedIndex默许值为 false ,则默许会走findUsingInfo()办法
  • 过程3:将过程2中获得的订阅办法调集,存储到缓存中,便利下一次获取,进步功率。

由于默许会走findUsingInfo()办法,咱们持续检查该办法:

    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //过程1:构建了查询状况缓存池,最多缓存4个类的查询状况
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            //过程2,获取查找状况对应的订阅信息,这儿EventBus 3.0 运用了索引类,
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    //将订阅者的一切的订阅办法增加到FindState的调集中
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {//过程3:假如订阅信息为null,则经过反射来获取类中一切的办法
                findUsingReflectionInSingleClass(findState);
            }// 持续查找父类的办法
            findState.moveToSuperclass();
        }
        //过程4,获取findState中的一切办法,并清空目标池
        return getMethodsAndRelease(findState);
    }
  • 过程1:创建与订阅者相关的 FindState 目标。会从 FinState 目标缓存池(最大为4个)中获取,一个订阅者目标对应一个FindState,一个订阅者目标对应一个或多个订阅办法。
  • 过程2:经过 FindState 目标 调用getSubscriberInfo()办法去获取订阅者相关的订阅办法信息。该办法运用了 APT 技能,构建了EventBus的索引类。
  • 过程3:假如经过过程2获取不到订阅办法信息,则经过反射来获取类中的一切的订阅办法。并将获取的办法,封装到 FindState 中的 subscriberMethods 调集中去。
  • 过程4:将 FindState 目标中的 subscriberMethods 调集返回。

在上述办法中,咱们需求留意的是,假如当时订阅着没有相关的订阅办法,那么会顺次遍历其父类的订阅办法。还有一个知识点,便是该办法中 FindState 运用了目标缓存池,不会每次注册一个订阅者就创建 一个FindState 目标。这样就节省了内存的运用。

关于索引类的知识点,会鄙人篇文章中进行介绍,这儿咱们直接检查findUsingReflectionInSingleClass()办法:

  private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
             //获取当时订阅者中的一切的办法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            //获取该类的一切public 办法 包括继承的公有办法
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        //循环遍历一切的办法,经过相关注解找到相应的订阅办法。
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            //满足修饰符为 public 而且非抽象、非静态
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                //找到参数为1,且该办法包括Subscrile注解的办法
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            // 创建订阅办法目标,并将对应办法目标,事情类型,线程形式,优先级,粘性事情封装到SubscriberMethod目标中。
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract”);
            }
        }
    }

该办法的逻辑也十分简略,经过获取 FindState 中的订阅者的 Class 目标,然后经过反射获取一切包括@Subscribe注解且参数为1的 Method 目标,并读取到该参数的类型EventType,接着读取注解中的thredModeprioritysticy,最终将这些数据都统一分装到新建的SubscriberMethod目标中,最终将该目标增加到 FindState 中的 subscriberMethods 调集中去。

实践订阅办法 subscribe

当找到订阅者一切的办法调集后,最终会遍历调用subscribe()办法,检查该办法:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        //过程1,将每个订阅办法和订阅者封装成Subscription
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //过程2,获取对应事情中一切的 Subscription,判别是否重复增加
        CopyOnWriteArrayList<Subscription> subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList();
            this.subscriptionsByEventType.put(eventType, subscriptions);
        } else if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
        }
        //过程3,依据优先级,将当时新封装的Subscription目标增加到subscriptionsByEventType中去
        int size = subscriptions.size();
        for(int i = 0; i <= size; ++i) {
            if (i == size || subscriberMethod.priority > ((Subscription)subscriptions.get(i)).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
       //过程4,将当时订阅者中与当时订阅者所订阅的事情类型,增加到typesBySubscriber中去
        List<Class<?>> subscribedEvents = (List)this.typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList();
            this.typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
        //过程5,假如该办法有订阅了粘性事情,则从stickyEvents中获取相应粘性事情,并发送
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = this.stickyEvents.get(eventType);
                this.checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

在上述办法中首要流程如下:

  • 过程1,将每个订阅办法和订阅者封装成 Subscription。
  • 过程2,获取对应事情中一切的 Subscription ,判别是否重复增加。
  • 过程3,依据优先级,将当时新封装的 Subscription 目标增加到 subscriptionsByEventType 中去。(设置了优先级后,EvenBus 就能够依照优先级顺序,将事情发送给订阅者)
  • 过程4,将当时订阅者中与当时订阅者所订阅的事情类型,增加到 typesBySubscriber 中去。
  • 过程5,假如该办法有订阅了粘性事情,则从 stickyEvents 中获取相应粘性事情,并发送。

再结合咱们最开端所画的 EventBus 大致流程,该办法其实就做了下图赤色虚线框中的事:

EventBus源码分析(一)

关于粘性事情的知识点,需求咱们了解事情的发送流程,咱们会鄙人文进行详细介绍。

事情发送流程源码剖析

事情的发送,首要分为简略事情粘性事情,别离对应办法为post()postSticky()两个办法。这儿咱们先看简略事情的发送,代码如下:

简略事情的发送

  public void post(Object event) {
     //过程1,获取当时线程中独立具有的PostingThreadState,并从中获取事情行列(eventQueue),将发送的事情增加到该行列中
        EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
        //过程2:判别当时线程是否正在分发事情,假如不是,则循环遍历事情行列中的事情,并将事情分发出去,直到当时事情行列空为止
        if (!postingState.isPosting) {
            postingState.isMainThread = this.isMainThread();
            postingState.isPosting = true;
            //假如当时分发事情状况为撤销,则抛出反常
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset”);
            }
            //循环遍历事情行列,并将音讯发送出去
            try {
                while(!eventQueue.isEmpty()) {
                    this.postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

在 EventBus 中会为个每调用 post() 办法的线程都会创建一个仅有的PostingThreadState目标,用于记载当时线程存储发送音讯与发送的状况,其内部结构如下所示:

EventBus源码分析(一)

也便是说当咱们调用EventBus.post()办法,其实是从 EventQueue 行列中取出音讯,然后经过调用 postSingleEvent()办法 来实践发送音讯,该办法代码如下所示:

  private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //过程1:判别否事情传递发送
        if (this.eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for(int h = 0; h < countTypes; ++h) {
                Class<?> clazz = (Class)eventTypes.get(h);
                //循环遍历遍历事情并发送
                subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            //过程2:假如不支撑事情的传递,那么这儿开端发送事情。
            subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);
        }
        //过程3:假如没有找到订阅的办法,提示用户
        if (!subscriptionFound) {
            if (this.logNoSubscriberMessages) {
                this.logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (this.sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
                this.post(new NoSubscriberEvent(this, event));
            }
        }
    }

该办法首要为如下三个过程:

  • 过程1:经过布尔变量eventInheritance判别是否支撑事情是否传递发送,假如支撑,那么经过lookupAllEventTypes()办法获得发送事情先人类及其接口。然后经过postSingleEventForEventType()办法,将它们都发送出去,
  • 过程2:过程1返回 false 那么就直接运用postSingleEventForEventType()办法发送事情。
  • 过程3:假如没有找到相关的订阅办法,那么就提示用户没有相关的订阅办法。

布尔变量eventInheritance默许为false,咱们能够经过 EventBusBuilder 来装备该变量的值。

那什么是事情的传递发送呢?咱们来检查lookupAllEventTypes()办法:


    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
            if (eventTypes == null) {
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                //获取该类一切先人类及其接口
                while (clazz != null) {
                    eventTypes.add(clazz);
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    clazz = clazz.getSuperclass();
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }
    //将接口增加到调集中
    static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
        for (Class<?> interfaceClass : interfaces) {
            if (!eventTypes.contains(interfaceClass)) {
                eventTypes.add(interfaceClass);
                addInterfaces(eventTypes, interfaceClass.getInterfaces());
            }
        }
    }

在该办法中,会获取发送事情的一切的先人类及其接口,最终将他们以调集的办法返回,在postSingleEvent办法中拿到这个调集之后,那么就会将调集中一切的数据都发送出去。这样做会造成什么效果呢?假如当时咱们的继承体系为 Aevent -> Bevent -> Cevent (->表明继承),那么经过发送 Aevent,那么其他一切订阅过 Bevent 及 Cevent 的订阅者都会收到音讯。

咱们持续检查postSingleEventForEventType()办法,代码如下所示:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        //从缓存中拿取之前存取的 Subscription
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //这儿找到相应的办法后,开端切换线程了。
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

该办法的逻辑十分简略,便是从咱们之前的subscriptionsByEventType调集中拿到存储的Subscription,并依据当时线程状况设置相关的PostingStatecanceledsubscriptionisMainThread等特点值,然后经过postToSubscription()办法来真实的履行事情的传递。

到目前为止整个流程如下所示:

EventBus源码分析(一)

postToSubscription()

postToSubscription() 办法是真实实践将事情传递到订阅者的代码。检查该办法:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

从上述办法中,咱们拿到Subscription中成员变量SubscriberMethod中的线程形式threadMode来判别订阅办法需求履行的线程。假如当时线程形式是POSTING,那么默许就直接调用invokeSubscriber()办法。详细代码如下所示:

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //直接经过反射调用订阅办法。
            subscription.subscriberMethod.method.
            invoke(subscription.subscriber, event);
        }
        //省掉部分代码
    }

假如为其他形式,那么会依据相应的poster调用enqueue()办法来操控履行订阅办法所在的线程。在 EventBus 中供给了如下三个 Poster 来操控订阅办法的所运转的线程。

  • HandlerPoster (切换到主线程)
  • BackgroundPoster (切换到后台线程)
  • AsyncPoster (切换到后台线程)

以上三个 Poster 都完成了 Poster 接口,且内部都维护了一个名为PendingPostQueue的行列,该行列以PendingPost为存储单元,其中PendingPost中存储内容为咱们依据当时事情所找到的Subscription与当时所发生的事情。

那么结合整个流程,咱们能得到下图:

EventBus源码分析(一)

针对上图,再进行一下简略的阐明。

  • 当咱们调用EventBus.post()发送简略事情时,会将该事情放入与线程相关的PostingThreadStateEventQueue中。
  • 接着会从之前在subscriptionsByEventType调集中找到与该事情相关的Subscription
  • 接着将找到的Subscription与当时所发送的事情都封装为PendingPost并增加到对应Poster中的PendingPostQueue行列中。
  • 最终对应的Poster从行列中取出相应的PendingPost,经过反射调用订阅者的订阅办法。

其中订阅办法履行线程的规矩,如下所示:

EventBus源码分析(一)

线程的切换

在上节中,订阅者的订阅办法履行的所在线程,是由 EventBus 中内部的三个Poster来完成的。那下面咱们就来看看这三个Poster的完成。

HandlerPoster

public class HandlerPoster extends Handler implements Poster {
    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;
    //默许会传递主线程的Looper
    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //这儿将PedingPost放入PendingPostQueue中,然后发送音讯
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message”);
                }
            }
        }
    }
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                //从行列中取出最近的PendingPost
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //直接经过反射,调用订阅者的订阅办法。
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message”);
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

HanderPoster 中的逻辑十分简略了解,继承 Handler,并在初始化的时候默许会相关主线程的 Looper,这样该 Handler 所发送的音讯将会在主线程中被处理。

剖析一下 HanderPoster 中首要的过程:

  • 在调用enqueue()办法时,会将之前咱们封装好的PendingPost放入PendingPostQueue行列中,同时发送音讯。
  • handleMessage()办法中,从PendingPostQueue行列中取出最近的PendingPost,然后直接经过eventBus.invokeSubscriber()反射履行订阅者的订阅办法。

BackgroundPoster

final class BackgroundPoster implements Runnable, Poster {
    private final PendingPostQueue queue;
    private final EventBus eventBus;
    private volatile boolean executorRunning;
    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        //运用线程池来提交使命,该办法是线程安全的。
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }
    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
}

BackgroundPoster 与 HandlerPoster 最大的不同是其内部运用了线程池,而且该类也完成了 Runnable 接口。

在 BackgroundPoster 中的enqueue()办法中,默许会运用 EventBus 中默许的线程池DEFAULT_EXECUTOR_SERVICE来提交使命 ,该线程池的声明如下:

 private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

CachedThreadPool 适用于大量的且耗时较少的使命

相同的,BackgroundPoster 也便是经过反射调用订阅者的订阅办法,只不过不同的是它是放入线程池中的非主线程中进行履行。

需求留意的是不管是在任何线程中发送音讯,EventBus 总是线程安全的。从 BackgroundPoster 的代码中咱们就能够看出。

AsyncPoster

class AsyncPoster implements Runnable, Poster {
    private final PendingPostQueue queue;
    private final EventBus eventBus;
    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }
    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available”);
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

这儿就不对 AsyncPoster 进行解说了,信任大家依据之前的内容也能了解。

粘性事情的发送

现在咱们还剩最终一个知识点了,便是粘性事情的发送。在 EventBus 中发送粘性事情,咱们需求调用办法postSticky()办法,代码如下所示:

  public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        post(event);
    }

从代码中,咱们不难看出,粘性的事情发送与简略事情的发送仅有的区别便是将发送的事情增加到stickyEvents调集中去了。那为什么要这么做呢?在了解详细的原因之前,咱们需求了解粘性事情的概念。

粘性事情的概念:当订阅者还没有订阅相关事情A时,程序现已发送了一些事情A,依照正常的逻辑,当订阅者开端订阅事情A时,是承受不到程序现已发送过的事情A,但是咱们希望承受到那些现已发送过的音讯。这种现已过时,但又被重新承受的事情,咱们称之为粘性事情。

那么依据粘性事情的思维,咱们需求将现已发送的事情存储下来,并在粘性事情的订阅的进程中进行特别的处理,也便是在EventBus.register()办法中进行处理。还记得之前注册进程中的subscribe()办法吗?该办法内部对粘性事情进行了特殊的处理,代码如下所示:

 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //省掉部分代码
        //判别是否是粘性事情
        if (subscriberMethod.sticky) {
            //支撑事情传递的粘性事情
            if (eventInheritance) {
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                //开端履行订阅办法。
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

在上述逻辑中,会从stickyEvents中获取之前发送的事情,然后调用checkPostStickyEventToSubscription()。该办法代码如下所示:

 private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }

又由于checkPostStickyEventToSubscription()办法内部会调用postToSubscription()办法。那么最终订阅者就能承受到之前发送的事情,并履行相应的订阅办法啦。

最终

EventBus 首要的流程到现在现已讲完了。从实践的代码中,咱们不仅能看到其杰出的代码标准以及封装思维。还能看到该结构对功能的优化,尤其是增加了一些必要的缓存。我信任以上的这些点,都是值得咱们学习与参阅的。

但是经过反射毕竟是相对耗费功能的,因而EventBus在编译阶段做了提早生成代码的优化,欢迎阅读我的下篇文章 EventBus源码剖析(二)