【FastDev4Android框架开发】消息总线EventBus源码分析以及与Otto框架对比(二十一)

jerry Android 2015年11月26日 收藏

().前言:   

        上一篇我们对EventBus的简介和基本使用做了说明,今天我们主要深入的使用EventBus,同时会从源码的角度对于订阅和发送消息做分析,以及和另外的消息总线框架Otto在性能等方面做一个对比分析 

       FastDev4Android框架项目地址:https://github.com/jiangqqlmj/FastDev4Android

().框架简单说明:   

       通过上一篇文章的介绍,EventBus的使用步骤如下

  •  定义一个事件,用于EventBus的分发。
  •  定义订阅者,把该订阅者加入到EventBus中。
  •  通过EventBus.post来进行分发事件,告诉订阅者有事情发生了。订阅者接收到信息进行相应处理。
  •  使用完成之后,订阅者需要反注册取消订阅。

   具体原理图如下:

           订阅者接收到通知的时候会调用相应的函数进行处理事件,在EventBus中一般有以下四种方法来让我们进行处理:

  1. onEvent
  2. onEventMainThread
  3. onEventBackground
  4. onEventAsync

 这四个订阅方法有很多的相似之处,但是功能上面还是有点不同的,EventBus会通过调用post方法来进行分发消息,让订阅者进行接收,订阅者接收到事件消息是通过上面几个方法来进行接收和处理的。下面我们来对这四个方法的具体使用场景做一个介绍:

  • onEvent:使用该方法作为订阅函数表示post消息事件和接收消息事件在同一个线程中。
  • onEventMainThread: 该方法会在UI  Main线程中运行,接收事件同时会在UI线程中运行,这样我们就可以在该方法中直接更新UI
  • onEventBackground:使用该方法,如果事件是在UI Main线程发出来,该方法会在子线程中执行,如果事件是从子线程中发出来,该onEventBackground方法会在子线程中执行。
  • onEventAsync:使用该方法,会在创建新的子线程中执行onEventAsync

   那么现在订阅的函数方法有四个,我们怎么会知道具体调用哪个方法呢?OK我们看一篇文章:我们会先创建一个事件类,然后进行post发送该对象,在订阅方法中接收,注入哪个函数的参数就是该发送过来的对象。这样我们应该清楚了吧,那是根据传进来的事件对象参数来进行判断的。具体我们看实例:

 ().调用实例:   

         3.1.实现需求:我们现在创建三个Event事件类,第二个Activity中进行发送,在订阅者Activity中进行接收订阅方法如下:            


/**
     * 收到消息 进行相关处理
     * @param event
     */
    public voidonEventMainThread(TestEventFirst event) {
       Log.d("zttjiangqq","onEventMainThread收到消息:"+event.getMsg());
        textView_one.setText(event.getMsg());
        //showToastMsgShort(event.getMsg());
    }
    /**
     * 收到消息 进行相关处理
     * @param event
     */
    public voidonEventMainThread(TestEventSecond event) {
 
       Log.d("zttjiangqq","onEventMainThread收到消息:"+event.getMsg());
        textView_two.setText(event.getMsg());
        //showToastMsgShort(event.getMsg());
    }
    /**
     * 收到消息 进行相关处理
     * @param event
     */
    public voidonEventMainThread(TestEventThird event) {
 
       Log.d("zttjiangqq","onEventMainThread收到消息:"+event.getMsg());
        textView_third.setText(event.getMsg());
        //showToastMsgShort(event.getMsg());
    }



   3.2.演示效果如下:

       

 

 ().源码解析

           以上主要为EventBus的主要使用,现在开始我们对于EventBus的注册和发送两个模块从源码的角度来走一下。

            4.1.EventBus对象获取:我们一般使用单例模式获取。保证对象唯一性。      


/**
     * 采用单例模式获取EventBus实例对象  一般我们获取EventBus对象 就是采用这种方式,不建议直接new
     * @return
     */
    public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = newEventBus();
                }
            }
        }
        return defaultInstance;
    }


         4.2.订阅模块:入口,进行订阅注册


 public void register(Object subscriber) {
        register(subscriber, false, 0);
    }


  •   subscriber:需要注册的订阅者,
  •  sticky:是否为粘性,这边默认为false,
  •   priority:事件的优先级,默认为0

     下面我们来具体看一下register(subscriber, false, 0)方法具体实现的功能:


private synchronizedvoid register(Object subscriber, boolean sticky, int priority) {
        List<SubscriberMethod>subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
        for (SubscriberMethod subscriberMethod: subscriberMethods) {
            subscribe(subscriber,subscriberMethod, sticky, priority);
        }
    }


     该函数中会通过findSubscriberMethods()来获取所有订阅的方法,具体主要的步骤我这边已经进行注释了


  /**
     * 进行查找订阅者中所有订阅的方法
     * @param subscriberClass
     * @return 所有订阅的方法的集合
     */
    List<SubscriberMethod>findSubscriberMethods(Class<?> subscriberClass) {
        String key = subscriberClass.getName();
        List<SubscriberMethod>subscriberMethods;
        //从缓存中获取订阅的方法,第一次使用肯定缓存中不存在
        synchronized (methodCache) {
            subscriberMethods =methodCache.get(key);
        }
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //订阅方法的集合
        subscriberMethods = newArrayList<SubscriberMethod>();
        Class<?> clazz = subscriberClass;
        HashMap<String, Class>eventTypesFound = new HashMap<String, Class>();
        StringBuilder methodKeyBuilder = newStringBuilder();
        while (clazz != null) {
            String name = clazz.getName();
            if(name.startsWith("java.") || name.startsWith("javax.") ||name.startsWith("android.")) {
                // Skip system classes, thisjust degrades performance
                // 这边直接跳过了系统类,因为系统类中 普通开发者不会使用在系统类中使用EventBus,所以就忽略处理了,不然会降低性能
                break;
            }
 
            // Starting with EventBus 2.2 weenforced methods to be public (might change with annotations again)
            try {
                // This is faster thangetMethods, especially when subscribers a fat classes like Activities
                // 通过反射来获取当前类中的所有方法
                Method[] methods =clazz.getDeclaredMethods();
                // 正式开始查询所有订阅的方法
               filterSubscriberMethods(subscriberMethods, eventTypesFound,methodKeyBuilder, methods);
            } catch (Throwable th) {
                th.printStackTrace();
                // Workaround forjava.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                Method[] methods =subscriberClass.getMethods();
                subscriberMethods.clear();
                eventTypesFound.clear();
               filterSubscriberMethods(subscriberMethods, eventTypesFound,methodKeyBuilder, methods);
                break;
            }
            clazz = clazz.getSuperclass();
        }
        //抛出异常,订阅者没有实现onEvent开头的公共方法
        if (subscriberMethods.isEmpty()) {
            throw newEventBusException("Subscriber " + subscriberClass + " has nopublic methods called "
                    + ON_EVENT_METHOD_NAME);
        } else {
            //订阅的方法存在,同时加入缓存
            synchronized (methodCache) {
                methodCache.put(key,subscriberMethods);
            }
            return subscriberMethods;
        }
    }


  然后调用filterSubscriberMethods()进行过滤,把订阅方法加入到集合中


/**
     * 查询订阅的方法,查到方法,方法加入到subScriberMethods
     * @param subscriberMethods
     * @param eventTypesFound
     * @param methodKeyBuilder
     * @param methods
     */
    private voidfilterSubscriberMethods(List<SubscriberMethod> subscriberMethods,
                                        HashMap<String, Class> eventTypesFound, StringBuildermethodKeyBuilder,
                                        Method[] methods) {
        //遍历类中的所有方法
        for (Method method : methods) {
            String methodName =method.getName();
            //过滤onEvent开头的方法
            if(methodName.startsWith(ON_EVENT_METHOD_NAME)) {
                //返回方法修饰符 例如public,private,protected
                int modifiers =method.getModifiers();
                Class<?> methodClass =method.getDeclaringClass();
                if ((modifiers &Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    //订阅方法必须为public类型
                    //进行获取方法的参数类型
                    Class<?>[]parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length== 1) {
                        //进行获取线程模式类型
                        ThreadMode threadMode =getThreadMode(methodClass, method, methodName);
                        if (threadMode == null){
                            continue;
                        }
                        //取出当前传入的订阅者
                        Class<?>eventType = parameterTypes[0];
                        //methodKeyBuilder key="0"."methodName".">"."eventType_Name"
                       methodKeyBuilder.setLength(0);
                       methodKeyBuilder.append(methodName);
                       methodKeyBuilder.append('>').append(eventType.getName());
                        String methodKey =methodKeyBuilder.toString();
                        Class methodClassOld =eventTypesFound.put(methodKey, methodClass);
                        if (methodClassOld ==null || methodClassOld.isAssignableFrom(methodClass)) {
                            // Only add if notalready found in a sub class
                            //构建一个订阅方法的对象(里面存入方法名,线程模式类型,事件类型),加入到订阅方法集合中
                           subscriberMethods.add(new SubscriberMethod(method, threadMode,eventType));
                        } else {
                            // Revert the put,old class is further down the class hierarchy
                           eventTypesFound.put(methodKey, methodClassOld);
                        }
                    }
                } else if(!skipMethodVerificationForClasses.containsKey(methodClass)) {
                    Log.d(EventBus.TAG,"Skipping method (not public, static or abstract): " + methodClass +"."
                            + methodName);
                }
            }
        }
    }


     上面已经进行获取了所有的订阅函数,那么现在开始就可以进行订阅了,让我们来查看subscribe()方法做的功能操作


/**
     * 开始进行为订阅者 注册相关的订阅方法
     * @param subscriber  订阅者
     * @param subscriberMethod  订阅的方法
     * @param sticky    是否为粘性
     * @param priority  优先级
     */
    private void subscribe(Object subscriber,SubscriberMethod subscriberMethod, boolean sticky, int priority) {
        //通过订阅方法中进行获取订阅方法的类型
        Class<?> eventType =subscriberMethod.eventType;
        //通过订阅事件的类型 进行获取所有的订阅信息(有订阅者对象,订阅方法,优先级)
       CopyOnWriteArrayList<Subscription> subscriptions =subscriptionsByEventType.get(eventType);
        //进行创建一个订阅者
        Subscription newSubscription = newSubscription(subscriber, subscriberMethod, priority);
        if (subscriptions == null) {
            //如果当前的事件类型不存在订阅信息,那么就创建一个订阅信息集合
            subscriptions = newCopyOnWriteArrayList<Subscription>();
            //同时把当前的订阅信息加入到该订阅中
           subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if(subscriptions.contains(newSubscription)) {
                //抛出异常,该订阅者已经注册过该事件类中
                throw newEventBusException("Subscriber " + subscriber.getClass() + "already registered to event "
                        + eventType);
            }
        }
 
        // Starting with EventBus 2.2 weenforced methods to be public (might change with annotations again)
        //subscriberMethod.method.setAccessible(true);
        // 优先级判断,进行排序
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size ||newSubscription.priority > subscriptions.get(i).priority) {
                subscriptions.add(i,newSubscription);
                break;
            }
        }
 
        //将当前的事件加入到订阅者列表中
        List<Class<?>>subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = newArrayList<Class<?>>();
            typesBySubscriber.put(subscriber,subscribedEvents);
        }
        subscribedEvents.add(eventType);
 
        //是否粘性判断
        if (sticky) {
            if (eventInheritance) {
                // Existing sticky events ofall subclasses of eventType have to be considered.
                // Note: Iterating over allevents may be inefficient with lots of sticky events,
                // thus data structure shouldbe changed to allow a more efficient lookup
                // (e.g. an additional mapstoring sub classes of super classes: Class -> List<Class>).
               Set<Map.Entry<Class<?>, Object>> entries =stickyEvents.entrySet();
                for(Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?>candidateEventType = entry.getKey();
                    if(eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent =entry.getValue();
                       checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent =stickyEvents.get(eventType);
               checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }


  OK完成以上步骤,我们就大体完成了订阅注册工作,下面就是需要分析一下post的流程:

  4.3.主要先看post主函数:


/**
     * 向EventBus中发送消息事件对象
     * @param event
     */
    public void post(Object event) {
        PostingThreadState postingState =currentPostingThreadState.get();
        //把消息加入到事件队列中
        List<Object> eventQueue =postingState.eventQueue;
        eventQueue.add(event);
        if (!postingState.isPosting) {
            postingState.isMainThread =Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw newEventBusException("Internal error. Abort state was not reset");
            }
            try {
                //当消息队列不为空的时候,进行这正式发送消息,采用循环,把队列中所有的消息发送出去
                while (!eventQueue.isEmpty()) {
                   postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread =false;
            }
        }
    }
     然后进行发送功能,调用postSingleEvent()函数方法:
//消息发送:发送单个事件消息
    private void postSingleEvent(Object event,PostingThreadState postingState) throws Error {
        Class<?> eventClass =event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>>eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes;h++) {
                Class<?> clazz =eventTypes.get(h);
                subscriptionFound |=postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound =postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribersregistered for event " + eventClass);
            }
            if (sendNoSubscriberEvent&& eventClass != NoSubscriberEvent.class &&
                    eventClass !=SubscriberExceptionEvent.class) {
                post(newNoSubscriberEvent(this, event));
            }
        }
    }


    接着进行消息过滤postSingleEventForEventType()方法


 /**
     * 进行该特定的Event发送相应的消息
     * @param event  事件消息
     * @param postingState
     * @param eventClass
     * @return
     */
    private booleanpostSingleEventForEventType(Object event, PostingThreadState postingState,Class<?> eventClass) {
       CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions =subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null &&!subscriptions.isEmpty()) {
            for (Subscription subscription :subscriptions) {
                postingState.event = event;
                postingState.subscription =subscription;
                boolean aborted = false;
                try {
                    //发生消息给订阅者
                   postToSubscription(subscription, event, postingState.isMainThread);
                    aborted =postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription =null;
                    postingState.canceled =false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }


  最终这边有一个核心的方法:postToSubscription()来进行post消息


 /**
     * 进行发送消息,同时根据发送过来的线程类型类型,发送消息给特定的订阅方法来进行执行
     * @param subscription  订阅者
     * @param event   执行事件
     * @param isMainThread 是否为主线程
     */
    private voidpostToSubscription(Subscription subscription, Object event, booleanisMainThread) {
        switch(subscription.subscriberMethod.threadMode) {
            case PostThread:
                //直接在本线程中调用订阅函数
                invokeSubscriber(subscription,event);
                break;
            case MainThread:
                if (isMainThread) {
                    //如果是主线程,直接调用订阅函数
                   invokeSubscriber(subscription, event);
                } else {
                    //如果不是主线程,通过handler进行处理
                   mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BackgroundThread:
                if (isMainThread) {
                    //如果是主线程,采用runnable 中调用
                   backgroundPoster.enqueue(subscription, event);
                } else {
                    //子线程,直接调用
                   invokeSubscriber(subscription, event);
                }
                break;
            case Async:
                //加入到子线程中进行发送
               asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw newIllegalStateException("Unknown thread mode: " +subscription.subscriberMethod.threadMode);
        }
     }


      4.4.取消注册(反注册)主要查看unregister()方法

/**
     * 订阅者取消注册,反注册
     * @param subscriber
     */
    public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }


     OK,到这边基本上完成EventBusregisterpost的流程的讲解,关于这个核心类EventBus的注释过的类文件已经上传了大家可以通过该地址进行下载:EventBus注释过的类文件

 ().Otto消息总线框架对比:

        OttoAndroid中另外一个消息总线库,它其实是EventBus的变种,该和EventBus有一些相同的方法(例如:register,post,unregister…),但是这两者之间也有一些不同之处如下:

 

EventBus

Otto

 声明事件处理方法

命名约定

注解

事件继承

YES

YES

订阅继承

YES

NO

缓存事件

YES,sticky events

NO

事件生产

NO

YES

子线程事件传输

YES(Default)

YES

主线程事件传输

YES

NO

后台线程事件传输

YES

NO

异步线程事件传输

YES

NO

除了以上功能不同以外,这边还有性能上面的差异。为了测试性能问题,我们clone当前EventBus项目的时候,会发现有一个EventBusPerformance项目,我们可以使用的不同场景来比较。

基于下表结果显示,每一个测试方面EventBus的性能都大于Otto

 

EventBus

Otto

在Android2.3模拟器发送1000个事件

70%

 

S3Android4.0系统,发送1000个事件

110%

 

Android2.3模拟器,注册1000个订阅者

10%

 

S3 Android4.0系统,注册1000个订阅者

70%

 

Android2.3模拟器,注册订阅者冷启动

350%

 

S3 Android4.04注册订阅者冷启动

几乎一样

 

通过对比发现EventBus无论在功能上面还是性能上面,远远超过Otto消息总线框架,所以我们建议使用EventBus消息总线框架。

   

           到此我们的EventBus专题内容已经全部讲完了,相信大家在这个专题中能对EventBus会有一个比较全面的了解,同时也能够简单的了解实现的原理以及相关逻辑。

我们的项目已经配置集成了消息总线EventBus的例子.欢迎大家去Github站点进行clone或者下载浏览:https://github.com/jiangqqlmj/FastDev4Android 同时欢迎大家starfork整个开源快速开发框架项目~