聊一聊 EventBus 源码和设计之禅

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/ziwang_/article/details/79953550

前言

笔者看过一些知名开源项目的源码,认为 EventBus 算是其中最简单的,甚至复杂程度不在一个级别上。解析源码前先提一下以下几个变量和类,掌握了这些变量和类基本上 EventBus 已经就掌握一半了。

  • METHOD_CACHEMap<Class<?>, List<SubscriberMethod>> 类型。键为注册类的 Class,值为该类中所有 EventBus 回调的方法链表(也就是被 @Subscribe 标记的方法们)。
  • Subscription 类(文中称订阅信息):关注类中两个字段,一个是 Object 类型的 subscriber,该字段即为注册的对象(在 Android 中时常为 Activity);另一个是 SubscriberMethod 类型的 subscriberMethod,细节如下。
  • subscriberMethodSubscriberMethod 类型(文中称订阅方法)。关注类中有个字段 eventTypeClass<?> 类型,代表 Event 的类类型。
  • typesBySubscriberMap<Object, List<Class<?>>> 类型。键为对象本身(注意:并非其 Class 对象),值为该对象中所有的 Event 的类型链表。该字段只用于仅用于判断某个对象是否注册过,在日常使用中几乎没什么作用。
  • subscribtionsByEventTypeMap<Class<?>, CopyonWriteArrayList<Subscribtion>> 类型。键为 Event 类型,值为元素为 Subscription(订阅信息)链表。核心字段。

register()

直接查看 EventBus#register() 源码:

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    // 根据当前注册类获取 List<SubscriberMethod>
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            // subsciber 对 List<SubscriberMethod> 中每个 SubscriberMethod 进行订阅
            subscribe(subscriber, subscriberMethod);
        }
    }
}

获取当前注册对象所有订阅方法信息

先查看如何根据当前注册类获取 List 的,SubscriberMethodFinder#findSubscriberMethods(Class<?> subscriberClass) 源码精简如下:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    // 如果已存在则返回
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    subscriberMethods = findUsingReflection(subscriberClass);
    METHOD_CACHE.put(subscriberClass, subscriberMethods);

    return subscriberMethods;
}

METHOD_CACHE 前面提到过,是存着注册类与其所有需要回调的 Event 方法列表的键值对。如果已经存在则直接返回,如果否则需要通过 findUsingReflection(subscriberClass) 方法进行查找再返回,当然,返回之前需要存入 METHOD_CACHE 中,否则该 METHOD_CACHE 就没有存在的意义了。

SubscriberMethodFinder#findUsingReflection() 源码如下:

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        // 通过纯反射去获取被 @Subscribe 所修饰的方法
        findUsingReflectionInSingleClass(findState);
        // 将当前 class 的父类 class 赋值给 findState.clazz 
        findState.moveToSuperclass();
    }
    // 重置 FindState 便于下一次回收利用
    return getMethodsAndRelease(findState);
}    

初始化 FindState 对象后,会进入一个 while 循环中,不停地去反射获取当前类和其父类的订阅方法并添入列表中,最终返回这个列表并重置 FindState 对象利于下一次重复使用。反射获取当前类和其父类的订阅方法源码简化如下:

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // 返回当前类自身方法和显式重载的父类方法
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    // needCheck
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            }
        }
    }
}

这里想要提及的一点事,获取到 @Subscribe 修饰的目标方法后,并非无脑地添入 subscriberMethods 中,而实际上是需要过滤一遍的,讲解 checkAdd() 源码前,希望读者思考以下几个问题:

  • 对于同一个 Event,当前类对该对象使用了多个方法进行了多次订阅,那么如果该 Event 被发射的时候,当前类会如何调用这些方法?
  • 对于同一个 Event,父类对该对象进行了一次订阅,子类重写该订阅方法,那么如果该 Event 被发射的时候,父类子类当中会如何处理这些方法?

解决这些方法就需要去看看 checkAdd() 的底层实现了——

boolean checkAdd(Method method, Class<?> eventType) {
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        return checkAddWithMethodSignature(method, eventType);
    }
}

可以看到 anyMethodByEventType 使用了 Event 的 Class 作为键,这像是意味着一个类对于同一个 Event 只能订阅一次,事实上是不是这样,还得继续看看 checkAddWithMethodSignature(),其源码简化如下:

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());

    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
    if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
        return true;
    } else {
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}

可以看到 subscriberClassByMethodKey 使用方法名 + '>' + 事件类型作为键,这意味着对于同一个类来说,subscriberClassByMethodKey 肯定不会键重复(毕竟一个类中不能够方法名相同且方法参数、个数都相同),因此它最终会返回 true。这意味着一个类如果使用了多个方法对同一个 Event 对象进行注册,那么当该 Event 对象被发射出来的时候,所有的方法都将会得到回调。

但是当父类执行上述操作的时候,如果子类有「显示」实现父类的订阅方法,那么此时 subscriberClassByMethodKey.put(methodKey, methodClass) 返回值不会为空,且为子类的 Class,此时 if 上分支将会判断子类 Class 是否 isAssignableFrom 父类 Class,这肯定是会为 false 的,这将会走入 if 下分支并返回 false。这意味着当子类「显示」实现父类的订阅方法的时候,如果此时发射指定 Event 的话,父类的订阅方法将不会执行,而仅会执行子类的订阅方法。

扫描二维码关注公众号,回复: 3428251 查看本文章

subscribe()

获取到相应的 SubscriberMethod 链表后,就是对链表中的 SubscriberMethod 对象进行订阅了,EventBus#subscribe() 方法源码精简如下:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    }

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            // 根据 priority 大小放入 List 中
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    // 省略 sticky 事件
}

subscriptionsByEventType 根据 Event 事件类类型获取订阅信息链表,当然,如果没有的话那就 new 一个并放入其中。接着根据订阅方法的优先级塞入该链表中。最后 typesBySubscriber 获取该 subsciber 的所有 Event 事件类型链表,并添加当前 Event 事件类型。关于 sticky 事件的具体内容在 sticky 中会具体讲解。

至此 EventBus#register(Object) 方法算是结束了。

post()

EventBus#post(Object) 源码精简如下:

public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    // 确保不会被调用多次
    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        try {
            while (!eventQueue.isEmpty()) {
                // 分发 Event 事件
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            // 最后要 reset flag
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

currentPostingThreadState 是一个 ThreadLocal 类,通过它获取到 PostingThreadState 对象,再根据该对象获取到 event 链表(有没有联想到 Android 中的消息机制?),并将传入的 event 塞入该链表。为了控制高并发下的一个 Event 回调不会被调用多次,PostingThreadState 对象有一个 isPosting 来标记当前链表是否已经开始进行回调操作,通过源码可以看到,每次分发完一个 Event 事件,该事件也会被从链表中 remove 出去。

postSingleEvent()

具体 postSingleEvent() 源码精简如下:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    postSingleEventForEventType(event, postingState, eventClass);
}

追溯 EventBus#postSingleEventForEventType() 源码精简如下:

private void postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
        }
    }
}

通过 subscriptionsByEventType 获取该 Event 事件对应的订阅信息链表,然后将该订阅信息Event 和当前线程信息传给了 postToSubscription() 方法,该方法戳进去一看就知道是用来去回调所有订阅方法的,该方法的具体分析在 threadMode 中。实际上到这里 post() 流程就算是结束了。所以实际上核心方法 post() 的源码是十分简单的,也可以看得到,核心字段也仅有 subscriptionsByEventType 一个而已。

unregister()

EventBus#unregister(Object) 方法源码精简如下:

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    }
}

整体看来分两步走,一步是移除注册对象和其所有 Event 事件链表,即 typesBySubscriber 移除相关键值对的;再就是在 unsubscribeByEventType() 方法中对 subscriptionsByEventType 移除了该 subscriber 的所有订阅信息(可以看到实际上没有对 METHOD_CACHE 进行相关移除操作,便于下一次注册的时候可以很方便拿到之前的信息,这便是缓存的作用所在)。

threadMode

在 EventBus 中,共有四种 threadMode,如下:

public enum ThreadMode {
    POSTING,

    MAIN,

    MAIN_ORDERED,

    BACKGROUND,

    ASYNC
}
  • POSTING:接收事件方法应执行在发射事件方法所在的线程(由于发射事件方法线程可能是主线程,这意味着接收方法不能执行耗时操作,否则会阻塞主线程)
  • MAIN:在 Android 中则接收事件方法应执行在主线程,否则(在 Java 项目中)等同于 POSTING。如果发射事件方法已位于主线程,那么接收事件方法会被「立即」调用(这意味着接收事件方法不能执行耗时操作,否则会阻塞主线程;同时,由于是「立即」调用,所以发射事件方法此时是会被接收事件方法所阻塞的),否则等同于 MAIN_ORDERED
  • MAIN_ORDERED:在 Android 中则接收事件方法会被扔进 MessageQueue 中等待执行(这意味着发射事件方法是不会被阻塞的),否则(在 Java 项目中)等同于 POSTING
  • BACKGROUND
    • 在 Android 中
      • 发射事件方法在主线程中执行,则接收事件方法应执行在子线程执行,但该子线程是 EventBus 维护的单一子线程,所以为了避免影响到其他接收事件方法的执行,该方法不应太耗时避免该子线程阻塞。
      • 发射事件方法在子线程中执行,则接收事件方法应执行在发射事件方法所在的线程。
    • 在 Java 项目中,接收事件方法会始终执行在 EventBus 维护的单一子线程中。
  • ASYNC:接收方法应执行在不同于发射事件方法所在的另一个线程。常用于耗时操作,例如网络访问。当然,尽量避免在同一个时间大量触发此类型方法,尽管 EventBus 为此专门创建了线程池来管理回收利用这些线程。

关于以上 threadMode 哪几种应避免耗时操作,耗时时阻塞的是哪条线程,希望各位读者能够仔细阅读。

说完几种 threadMode 之后,再来看看前文遗留下来的问题——postToSubscription() 源码如下:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

细看源码,其实可以发现只用到了两种方法,一种是 invokeSubscriber 意味着立即调用该方法,另一种是 xxxPoster.enqueue() 意味着需要使用其他线程来执行该方法。

invokeSubscriber()

源码如下:

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        //纯反射
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

实在是简单粗暴直接通俗易懂,笔者佩服。

那么那些情况会使用 invokeSubscriber() 方法呢?

  • POSTING:不用说,既然和发射事件线程同一条线程执行,那么当然直接调用 invokeSubscriber() 即可。
  • MAIN:在确保发射事件线程是主线程的情况下,直接调用 invokeSubscriber()
  • MAIN_ORDERED:如果当前项目不是 Android 项目情况下(纯 Java 项目),将会直接调用 invokeSubscriber()
  • BACKGROUND:前面提到如果发射事件线程不是主线程的话,接收事件将会执行于发射事件所在的线程,所以也会直接调用 invokeSubscriber()

文中已多次提到 Android 项目和纯 Java 项目,是由于在 Java 项目中大部分情况下不需要特地区分主线程和子线程(这一点笔者也得到了女票的证实)。其实不仅是 EventBus,RxJava 也是如此,RxJava 中是没有 Schedules.mainThread() 一说的,仅有 Schedulers.trampoline() 表当前线程。

Poster#enqueue()

根据源码可以看出来分为以下三种:

这里写图片描述

HandlerPoster 源码不在此扩展了,熟悉 Android 的读者们应该都猜得到 HandlerPoster 底层实现肯定是通过 Handler 机制来实现的,HandlerPoster#enqueue() 方法的实现离不开 Hanlder#sendMessage(),而处理方式肯定就是在 Hanlder#handleMessage() 中去调用 invokeSubscriber()

BackgroundPoster 源码也不在此扩展了,前面提到 EventBus 会维护单一线程去执行接收事件方法,所以肯定会在 Runnable#run() 中去调用 invokeSubscriber()

AsyncPoster 的底层实现实际上与 BackgroundPoster 大同小异,但是有读者会疑惑了,BackgroundPoster 底层维护的是「单一」线程,而 AsyncPoster 肯定不是这样的啊。这里的细节留到设计技巧一节再来细说。

sticky

什么叫做 sticky 事件笔者此处就不做扩展了。项目中如果想要发射 sticky 事件需要通过 EventBus#postSticky() 方式,源码如下:

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    post(event);
}

可以看到第一步是将该事件放入 stickyEvents 中,第二步则是正常 post()。为避免多线程操作 postSticky(Object)removeStickyEvent(Class<?>) 引发的冲突,所以对 stickyEvents 对象添加了 synchronized 关键字,不得不说 EventBus 作者的设计实在是缜密啊。前文提到 EventBus#register() 中关于 sticky 事件的代码简化如下:

if (subscriberMethod.sticky) {
    Object stickyEvent = stickyEvents.get(eventType);
    if (stickyEvent != null) {
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

可以看到,没有什么特殊的地方,判断当前事件是否 sticky,如果 sticky 则从 stickyEvents 拿出该事件并执行 postToSubscription() 方法。

优化操作

eventInheritance

不知道各位读者在日常使用 EventBus 中会不会在 Event 之间存在继承关系,反正笔者是没这样用过。也正是存在笔者这种不会这样使用 Event 和会使用 Event 继承的开发者之间的矛盾才会有这个字段出现。全局搜索该字段可以发现两处使用,都是用于发射事件的时候判断是否需要发射父类事件,由于该字段默认为 true,所以如果各位读者和笔者一样在项目开发中 Event 不存在继承关系的话,可以将该字段设为 false 以提高性能。

APT

EventBus 内部使用了大量的反射去寻找接收事件方法,实际上有经验的小伙伴知道可以使用 APT 来优化。这也就是 EventBus 3.0 引入的技术,此处的使用便不在此处扩展了,代码中通过 ignoreGeneratedIndex 来判断是否使用生成的 APT 代码去优化寻找接收事件的过程,如果开启了的话,那么将会通过 subscriberInfoIndexes 来快速得到接收事件方法的相关信息。所以各位读者如果没有在项目中接入 EventBus 的 APT,那么可以将 ignoreGeneratedIndex 设为 false 提高性能。

设计技巧

反射方法

EventBus 在获取接收事件方法的信息中,通过 getDeclaredMethods() 来获取类中所有方法而并不是通过 getMethods(),由于前者只反射当前类的方法(不包括隐式继承的父类方法),所以前者的效率较后者更高些。

FindState

以下代码是 FindState 的获取:

private FindState prepareFindState() {
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            FindState state = FIND_STATE_POOL[i];
            if (state != null) {
                FIND_STATE_POOL[i] = null;
                return state;
            }
        }
    }
    return new FindState();
}

以下代码是 FindState 的回收复用:

private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
    List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
    findState.recycle();
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            if (FIND_STATE_POOL[i] == null) {
                FIND_STATE_POOL[i] = findState;
                break;
            }
        }
    }
    return subscriberMethods;
}

可以看到,EventBus 使用 FindState 并不是简单的 new,由于 FindState 在注册流程中使用频繁且创建耗费资源,故创建 FindState 池复用 FindState 对象,与此相同的还有 PendingPost,它用于反射调用接收事件方法,具体不在此扩展。

AsyncPoster、BackgroundPoster

前面提到 AsyncPosterBackgroundPoster 的底层实现是一样的,但是有读者会疑惑了,BackgroundPoster 底层维护的是「单一」线程,而 AsyncPoster 肯定不是这样的啊——笔者也是读了源码之后才发现被 EventBus 作者摆了一道——在默认情况下实际上两者底层维护的都是 Executors.newCachedThreadPool(),这是一个有则用、无则创建、无数量上限的线程池。而 BackgroundPoster 是如何控制「单一」的呢?其在 Executor#execute() 上添加了 synchronized 并设立 flag,保证任一时间只且仅能有一个任务会被线程池执行;而 AsyncPoster 只需无脑地将传来的任务塞入线程池即可。

后记

EventBus 源码虽简单,但是当中的很多设计技巧是非常值得学习的,例如前文提到的复用池,以及遍布 EventBus 源码各处的 synchronized 关键字。希望各位读者也能够深入到其中去探索一番,寻找到笔者未找到的宝藏。

猜你喜欢

转载自blog.csdn.net/ziwang_/article/details/79953550