EventBus3.0源码解析

1.概述

在EventBus3.0中 和 2.0 有这不小的变化。
先看看如何在项目中使用EvetBus3.0吧。
一般的基本使用大都这样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}

@Override
public void onStop() {
EventBus.getDefault().unregister(this);
super.onStop();
}

@Subscribe(sticky = true, threadMode = ThreadMode.MAIN) //在ui线程执行
public void onMainEvent(UserDetail userInfo) {

}

@Subscribe(threadMode = ThreadMode.ASYNC) //异步执行
public void onAsynclEvent(UserDetail userInfo) {

}

@Subscribe( threadMode = ThreadMode.POSTING) //直接执行
public void onPostingEvent(UserDetail userInfo) {

}

@Subscribe(threadMode = ThreadMode.BACKGROUND) //后台执行
public void onBackgroundEvent(UserDetail userInfo) {

}

在3.0版本的EventBus上,不再限制函数名字,只需在你想要接收函数上方加上注解@Subscribe即可,再根据需求添加相应的threadMode(里面的标志将决定该方法运行在哪个线程),如果希望可以延迟接收,则可以加上sticky = true的标志。

再说注册后,如何发送event。

1
2
3
EventBus.getDefault().postSticky(mUserDetail);
//或者
EventBus.getDefault().post(mUserDetail);

发出去之后,数据就可以在接收函数里处理了。

2.register

要了解该框架,肯定要从最开始的地方开始解读,那就是从EventBus.getDefault().register(this);开始吧。

1
2
3
4
5
6
7
8
9
10
11
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}

这里的getDefault()是一个单例模式,防止反复创建浪费资源。

register()从2.0的4个精简到现在3.0的一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
* are no longer interested in receiving events.
* <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
*/

public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

从变量的命名就可以猜出来,EventBus的实现是采用了观察者模式。
第一行代码findSubscriberMethods(subscriberClass)是根据传进来的subscriber(也就是传进来的activity)来寻找函数,也就是在使用时加了注解@Subscribe的函数。得到一个list,再将每个函数与subscriber进行订阅。

我们再到这个函数内部去看个究竟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

其中的METHOD_CACHE的本质是一个ConcurrentHashMap,里面存放的是订阅者(就是activity) 和其对应的函数列表。如果该activity对应的函数列表已经存在,则返回,如果不存在则根据标志位ignoreGeneratedIndex来获取相应的函数列表。找到后,加入METHOD_CACHE这个ConcurrentHashMap中。
ignoreGeneratedIndex这个标志位的默认值为false,如果你需要自定义一个EventBus实例,可以用EventBusBuilder来自定义从而修改该值。但是一般情况下,我们调用的默认实例这个值是false。
接下来就进入findUsingInfo(subscriberClass)这个函数里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

这个函数也是比较好理解的,一开始是先初始化,再用getSubscriberInfo(findState)去获取订阅的信息,如果存在就放入subscriberMethods这个队列中,不存在则使用findUsingReflectionInSingleClass(findState)去获取。

那我们进去看下getSubscriberInfo(findState)的内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

这个函数先去寻找父类寻找订阅信息,如果找不到,再根据subscriberInfoIndexes的节点寻找。但是这个subscriberInfoIndexes的默认值是null,也就是说如果你不打算用EventBusBuilder来自定义个EventBus的实例的话,最终都会调到findUsingReflectionInSingleClass(FindState findState)里面去。
再前面的findSubscriberMethods方法中的findUsingReflectionInSingleClass(FindState findState);内部的实现也是调用到了findUsingReflectionInSingleClass(FindState findState)
那就来看下这里面的具体情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

这里就是调用了反射。
13行代码,过滤掉了 非public、静态 和 抽象方法。
15行则把参数大于1的函数排除在外。
16~23行,就是根据注解中threadMode,把订阅的函数存放到subscriberMethods中。

再一次回到register中:

1
2
3
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}

再会回到这里时,我们已经拿到在activity中所有加了订阅函数。
接下来就会调用subscribe方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

subscriberMethod 中包含了method、threadMode、eventType、priority、sticky等信息。
根据eventType来去查找CopyOnWriteArrayList<Subscription>,再根据这个值去subscriptionsByEventType中查找,如果不存在则保存进去。而这个subscriptionsByEventType才是正在存放方法的地方,本质为Map<Class<?>, CopyOnWriteArrayList<Subscription>>

16~22行的意思根据优先级排序,把高优先级的方法放在前面。
24~29行存储activity 订阅方法中的所有eventType。
31~48行 看看是否存在stickyEnent,存在看看该activity是否有满足条件的订阅函数,存在则立即发送。

register全部结束,大致意思就是把activity中所有的订阅函数的各种参数(比如method、eventType、threadMode等等)全部存放起来。同时看看有没有存在stickyType,符合条件则发送。

3.post

register把我们的方法全部存储起来,post则是开始调用我们的方法了。
直接到post内部看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    /** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}

PostingThreadState里面有一个队列和一些标志位。这个类的目的看注释的意思是为了提高运行效率。
把事件放去队列,再调用postSingleEvent去发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

核心代码的意思就是根据event获取到类,再从里面去获取该类的所有eventType,一一匹配,如果匹配到了,就调用postSingleEventForEventType发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

在这个函数中则是继续调用了postToSubscription来发送event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

在这个函数中,目的就比较明显了。根据在threadMode在做相应的处理。
如果是POSTING,则直接调用invokeSubscriber,它就是直接利用反射调用订阅函数。

1
2
3
4
5
6
7
8
9
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

如果是MAIN,则先判断当前的线程是否为主线程,如果是主线程了则直接发送,不是的话则放进mainThreadPoster。而mainThreadPoster本质是一个hanlder,里面传入了主线程的looper,所以会在主线程执行。

1
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

如果是BACKGROUND,刚好和Main相反,如果当前已经是主线程了,则放入backgroundPoster,它也是封装了一层的handler,内部传入了本地的looper。

如果是ASYNC,则入队异步执行。他内部的线程池用的就是Eventbus的线程池:

1
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

该线程池是非核心线程池,只要有空闲的线程都会及时回收掉。而当整个线程池处于闲置状态时,所有里面的线程都会停止,所以newCachedThreadPool占用的系统资源是十分小的。

4. 结束

到这里,EventBus3.0基本流程已经基本过了一遍,还有一些方法,比如PostSticky(),也就是将事件先存放在StickyMap里面,再调用前面说的post方法而已。
这里只是把大致流程过了一遍,里面的一些细节就需要你自己去慢慢把握了。

优秀的开源项目里有很多东西值得我们去学习,希望以后也可以写出这么优美的代码。

本人水平有限,文中若有偏差,请多多指导!