EventBus for Android 源码分析

上文实例讲解EventBus for Android介绍了EventBus的基本用法,本文将介绍EventBus的实现原理。EventBus的实现主要围绕两个函数registerpost,下面分别介绍之。

1 register(Object subscriber)

功能
注册subscriber中以onEvent开头的方法
代码:

private synchronized void register(Object subscriber, boolean sticky, int priority) {
    //构造出方法列表
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
    for (SubscriberMethod subscriberMethod : subscriberMethods) {
        //注册每个方法
        subscribe(subscriber, subscriberMethod, sticky, priority);
    }
}

重要数据结构:
SubscriberMethod:存储方法、线程模型和Event事件处理的参数类型
subscriber函数用两个Map:subscriptionByEventType和typesBySubscriber来维护事件订阅者和事件类型的关系,如代码所示

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) {
    Class<?> eventType = subscriberMethod.eventType;
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<Subscription>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
    // subscriberMethod.method.setAccessible(true);

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        //以优先级顺序来插入队列中
        if (i == size || newSubscription.priority > subscriptions.get(i).priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<Class<?>>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    ...
}

这个函数中用到了CopyOnWriteArrayList,下次研究一下。

2 post(Object event)

先看源码:

public void post(Object event) {
    //postingState是一个ThreadLocal型变量, 表示发送事件的进程状态
    PostingThreadState postingState = currentPostingThreadState.get();
    //将事件加如到队列中
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);
    //发送Event
    if (!postingState.isPosting) {
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            //执行队列中的任务
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

这里遇到一个类:PostingThreadState,用来记录当前线程的状态和任务队列,EventBus用postSingleEvent来处理任务队列中的每个任务,代码如下:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        //获取事件类型,包括基类和接口
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            Log.d(TAG, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

这个函数的主要功能是获取Event的类型,如果eventInheritance为True,则会获取Event的父类,和接口,然后对每一种类型调用一次 postSingleEventForEventType(Object event, PostingThreadState postingState, Class

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //获取相应事件类型为eventClass的订阅者
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            //初始化发送事件线程的状态
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

在获取到订阅者,方法和发送事件的线程类型之后,EventBus调用 postToSubscription(Subscription subscription, Object event, boolean isMainThread)来通知订阅者进行事件处理,该函数代码如下:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case PostThread:
                //直接调用Event处理事件
                invokeSubscriber(subscription, event);
                break;
            case MainThread:
                if (isMainThread) {
                    //发送线程是主线程,直接调用
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BackgroundThread:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    //发送线程是非主线程,直接调用
                    invokeSubscriber(subscription, event);
                }
                break;
            case Async:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

该函数根据不同的线程模型来采用不同的方法来通知订阅者,下面分别讨论之:

1 PostThread模型

直接调用invokeSubscriber(Subscription subscription, Object event),该方法利用反射原理来调用接受者的事件处理方法

2 MainThread模型

若发送线程为主线程,则直接调用订阅者事件处理方法。
若发送线程为非主线程,则将事件发送到mainThreadPoster中进行处理。mainThreadPoster的类型为HandlerPoster,继承自Handler。HandlerPoster内部维护一个队列(PendingPostQueue)来存取订阅者和对应的事件响应方法。每次往HandlerPoster中插入数据时, HandlerPoster便发一个消息,告知Handler来从PendingPostQueue中取出数据,然后调用订阅者的事件处理方法,代码如下:

void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            //若looper不在运行,则发消息通知它
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            //取出队列中的数据,调用订阅者的方法
            eventBus.invokeSubscriber(pendingPost);
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

3 BackgroundThread模型

若发送线程为非主线程,则直接调用事件响应函数。
若发送线程为主线程,则将消息发送到backgroundPoster中。backgroundPoster的类型为BackgroundPoster,继承自Runnable。和HandlerPoster类型,该对象也维护一个事件队列:PendingPosteQueue,每次插入数据时,BackgroundPoster会将自己放入到EventBus的线程池中等待调度,完整代码如下:

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        //若线程池中的任务已经运行,则立即返回,
        if (!executorRunning) {
            executorRunning = true;
            //获取到EventBus的线程池
            eventBus.getExecutorService().execute(this);
        }
    }
}

@Override
public void run() {
    try {
        try {
            while (true) {
                //等待一秒后取出任务
                PendingPost pendingPost = queue.poll(1000);
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            executorRunning = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
            }
        } catch (InterruptedException e) {
            Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
        }
    } finally {
        executorRunning = false;
    }
}

4 Async模型

该模型直接将事件发送到asyncPoster中进行处理。asyncPoster的类型为AsyncPoster,继承自Runnable。与BackgroundPoster不同的是,AsyncPoster将数据插入队列后,直接将自己放入线程池中处理,完成代码如下:

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    queue.enqueue(pendingPost);
    eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
    PendingPost pendingPost = queue.poll();
    if(pendingPost == null) {
        throw new IllegalStateException("No pending post available");
    }
    eventBus.invokeSubscriber(pendingPost);
}

Async与BackgroundPoster的不同之处为前者每次加入订阅者和事件后,便立即将自己放进线程池中,确保每一个任务都处于不同的线程中。而BackgroundPoster会根据线程池中的BackgroundPoster类型的任务是否处于运行状态来往线程池中加任务,这样就确保所有处于BackgroundThread模型中的事件处理任务都在同一个后台线程中运行。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。