post(发送)
post(Object event)
//发送的第一个步骤
public void post(Object event) {
//发送的线程状态
PostingThreadState postingState = currentPostingThreadState.get();
//事件队列
List<Object> eventQueue = postingState.eventQueue;
//将当前发送的event添加到队列中
eventQueue.add(event);
//当前post状态不是再发送中则可以发送
if (!postingState.isPosting) {
//当前线程状态
postingState.isMainThread = isMainThread();
//发送状态改为正在发送
postingState.isPosting = true;
//当前发送状态是否已中止,中止了未重置这里抛出异常
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循环发送队列中的event
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
postSingleEvent()
//发送事件第二部
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//如果是继承事件,则需要一层一层逐级发送
if (eventInheritance) {
//找到所有可以继承关系中的event.class,继承关系中需要使用当前object的class发送,而不是event的class对象去发送
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
//循环所有可以接收该event的class并向它们发送该event
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//不是继承的class关系,则直接发送
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果发送不成功,则这里进行异常处理
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
postSingleEventForEventType()
//想所有可以接收event类型的已经注册了event.class的method发送消息
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//从缓存中取出subscriptions
subscriptions = subscriptionsByEventType.get(eventClass);
}
//找到所有注册了订阅者,开始循环给它们发送消息
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//这里是真正给每个订阅者发送消息的方法
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postToSubscription()
//post到订阅者中
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//判断订阅者的线程要求
switch (subscription.subscriberMethod.threadMode) {
case POSTING://当前线程中
//调用invokeSubscriber
invokeSubscriber(subscription, event);
break;
case MAIN://主线程中
if (isMainThread) {
//如果在主线程中直接调用invokeSubscriber
invokeSubscriber(subscription, event);
} else {
//如果不再主线程中,需要将该粘性事件发送到主线程后再调用invokeSubscriber
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED://在主线程中,但是不会造成阻塞
if (mainThreadPoster != null) {
//将该订阅事件发送到主线程去处理
mainThreadPoster.enqueue(subscription, event);
} else {
//官方解释
//临时的处理:由于发送者和订阅者之间无法耦合,所以技术上是不正确的
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND://后台线程中
if (isMainThread) {
//如果是主线程,则将订阅事件发送到后台线程中
backgroundPoster.enqueue(subscription, event);
} else {
//否则在当前线程中订阅
invokeSubscriber(subscription, event);
}
break;
case ASYNC://异步耗时线程
//将订阅事件发送到异步线程中
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
invokeSubscriber()
这个方法就是post的主要方法
//调用订阅。订阅者信息,订阅者所属的object,post
void invokeSubscriber(Subscription subscription, Object event) {
try {
//这里使用了java原生的反射invoke方法
//第一个参数是订阅此event的对象object
//第二个参数是event
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
//处理订阅异常
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
post到不同线程
mainThreadPoster.enqueue
从其他线程切换到主线程再post
Poster
// 发送事件
interface Poster {
/**
* 将要发布的事件排队用于特定订阅。
*
* @param subscription 将接收事件的订阅。
* @param event 该事件将发布到订阅服务器。
*/
void enqueue(Subscription subscription, Object event);
}
getMainThreadSupport()
//EventBusBuilder中获得主线程的实现类
MainThreadSupport mainThreadSupport;
MainThreadSupport getMainThreadSupport() {
//一个eventbus中只有一个实例对象
if (mainThreadSupport != null) {
return mainThreadSupport;
}
//检查android日志是否可用
else if (AndroidLogger.isAndroidLogAvailable()) {
//获得主线程的looper
Object looperOrNull = getAndroidMainLooperOrNull();//1
//返回一个实现了MainThreadSupport接口的类
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);//2
} else {
return null;
}
}
getAndroidMainLooperOrNull()
//获得Android的主线程的Looper
Object getAndroidMainLooperOrNull() {
try {
return Looper.getMainLooper();
} catch (RuntimeException e) {
// Not really a functional Android (e.g. "Stub!" maven dependencies)
return null;
}
}
MainThreadSupport
import android.os.Looper;
//主线程实现类接口
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
//检查是否是主线程
//这里的Looper.myLooper()在android源码中的意义就是主线程的Looper
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
//new一个poster的handler
return new HandlerPoster(eventBus, looper, 10);
}
}
}
HandlerPoster
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.os.SystemClock;
//poster的Handler类,作用是切换主线程
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;//这是一个消息队列
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;//是否处理活动
//初始化
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
//发送
//subscription对象和event对象
public void enqueue(Subscription subscription, Object event) {
//创建一个消息,并放入消息队列中
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
//接收消息,这里将从非ui线程的消息接收到主线程中
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//在主线程中再调用invokeSubscriber()方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//如果子线程发送消息超时,则抛出异常
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
//这里已经进入了post的Handler逻辑
//返回的是HandlerPoster对象
mainThreadPoster = mainThreadSupport.createPoster(EventBus)
backgroundPoster.enqueue()
//其他方法暂不说明,这里只分析子线程切换主线程发送消息的源码
asyncPoster.enqueue()
//其他方法暂不说明,这里只分析子线程切换主线程发送消息的源码
总结
- post的是一个object对象
- 先将object添加到消息发送队列中,排队发送
- 现判断当前object是否是含有继承关系
- 如果有继承关系在里面则要将每一层继承关系单独post每一层继承object的class对象
- 如果没有继承关系,则直接post当前event的class对象
- 从缓存中找出当前event.class的Subscription对象
- 再post之前先判断线程要求
- 如果是当前线程到当前线程,则直接post
- 如果是当前线程到主线程,则需要切换到主线程再post
- 其他线程要求同理
- post的方法主要使用了反射
- 当前subscription对象对应的subscriberMethod的method,使用反射调用该方法
- subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
- 主要使用java的反射,method.invoke(object,object);
- 第一个参数是当前方法所在的class的对象
- 第二个参数则是当前方法的传递进来的参数对象
- post的时候会将上述反射循环遍历每一个标记了可以接收该event.class的方法