线程选择
/**
*
* @param subscription
* @param event
* @param isMainThread 发送通知时,所在线程
*/
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//注解时,选择的处理线程
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN://处理在主线程
if (isMainThread) { //发送在主线程,
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
其中呢,isMainThread是怎么来的呢
//根据looper判断postingState所在线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postToSubscription(subscription, event, postingState.isMainThread);
//接下来,看看HandlerPoster是什么情况
//在EventBus创建时,就已经同时创建了
EventBus(EventBusBuilder builder) {
//....
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
//....
}
HandlerPoster
/**
* HandlerPoster就是一个Handler,只不过我们平时是在Activity里使用,这里抽取出来 * 跟我们平时没什么差别(主线程)
*/
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;//任务链表
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
void enqueue(Subscription subscription, Object event) {
//从pendingPostPool获取一个 PendingPost,用途跟message差不多,
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//发送空消息,通知有新事件发生
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
//检测10秒没有响应
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
}
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//清空PendingPost携带的信息,放入到pendingPostPool池中,下次再获取,不用再次创建pendingPost(对象充足时)
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
//最终来到具体的方式调用,通过反射
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
其中涉及到两个对象
/**
* 一个简单的链表实现
*/
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
/**
* 自己封装的,用途与message类似
*/
final class PendingPost {
//类对象持有,
private final static List<org.greenrobot.eventbus.PendingPost> pendingPostPool = new ArrayList<org.greenrobot.eventbus.PendingPost>();
Object event;
Subscription subscription;
org.greenrobot.eventbus.PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
/**
* 上面有说道过
* 从pendingPostPool获取一个 PendingPost,用途跟message差不多,
* 没有则创建,数量慢慢多起来,后面就不需要重复创建了
* @param subscription
* @param event
* @return
*/
static org.greenrobot.eventbus.PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
org.greenrobot.eventbus.PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new org.greenrobot.eventbus.PendingPost(event, subscription);
}
/**
* 上面也有说道过
* 清空PendingPost携带的信息,放入到pendingPostPool池中,下次再获取,不用再次创建pendingPost(对象充足时)
* @param pendingPost
*/
static void releasePendingPost(org.greenrobot.eventbus.PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
BackgroundPoster
/**
* 本质是一个Runnable
*/
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//与HandlerPoster一样,就不说了
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
//这里不一样了
if (!executorRunning) {
executorRunning = true;
//这里把当前runnable 放入到一个线程池里,什么时候执行,看情况
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
//追踪一下,最后发现是放到了一个newCachedThreadPool中(容量无限,不断创建)
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}