所有事情都是从HelloWorld开始
import org.rribbit.Listener; import org.rribbit.RRiBbitUtil; import org.rribbit.RequestResponseBus; import org.rribbit.creation.InstantiatingClassBasedListenerObjectCreator; import org.rribbit.creation.ObjectBasedListenerObjectCreator; public class HelloWorld { private static RequestResponseBus rrb; public static final void main(String[] args) throws Exception { ObjectBasedListenerObjectCreator creator = new InstantiatingClassBasedListenerObjectCreator(HelloWorld.class); rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false); HelloWorld helloWorld = new HelloWorld(); helloWorld.sayHello(); } public void sayHello() { rrb.send("say", "Hello World"); } @Listener(hint="say") public void say(String message) { System.out.println(message); } @Listener(hint="say") public void say2(String message) { System.out.println(message + " : )"); } }
看看InstantiatingClassBasedListenerObjectCreator都做了什么
public class InstantiatingClassBasedListenerObjectCreator extends AbstractClassBasedListenerObjectCreator { private static final Logger log = LoggerFactory.getLogger(InstantiatingClassBasedListenerObjectCreator.class); /** * @see AbstractClassBasedListenerObjectCreator#AbstractClassBasedListenerObjectCreator(Class...) * * @param classes */ public InstantiatingClassBasedListenerObjectCreator(Class<?>... classes) { super(classes); }
public abstract class AbstractClassBasedListenerObjectCreator extends ObjectBasedListenerObjectCreator { private static final Logger log = LoggerFactory.getLogger(AbstractClassBasedListenerObjectCreator.class); protected Collection<Class<?>> excludedClasses; /** * Calls {@link #addClass(Class)} on each given class. * * @param classes */ public AbstractClassBasedListenerObjectCreator(Class<?>... classes) { excludedClasses = new ArrayList<Class<?>>(); for(Class<?> clazz : classes) { this.addClass(clazz); } } public void addClass(Class<?> clazz) { log.debug("Processing class '" + clazz.getName() + "'"); Object targetObject = this.getTargetObjectForClass(clazz); if(targetObject != null) { log.debug("Found target object for class '" + clazz.getName() + "', getting Listener methods"); Collection<ListenerObject> incompleteListenerObjects = this.getIncompleteListenerObjectsFromClass(clazz); for(ListenerObject listenerObject : incompleteListenerObjects) { listenerObject.setTarget(targetObject); } listenerObjects.addAll(incompleteListenerObjects); this.notifyObserversOnClassAdded(clazz); } } @Override protected Object getTargetObjectForClass(Class<?> clazz) { try { log.debug("Attempting to construct instance of class '" + clazz.getName() + "'"); return clazz.getConstructor().newInstance(); } catch(InvocationTargetException e) { throw new RuntimeException("Exception thrown from constructor of Listener", e.getCause()); } catch(Exception e) { log.debug("No suitable constructor found, returning null"); return null; } }
protected Collection<ListenerObject> getIncompleteListenerObjectsFromClass(Class<?> clazz) { Collection<ListenerObject> incompleteListenerObjects = new ArrayList<ListenerObject>(); for(Method method : clazz.getMethods()) { Listener listener = method.getAnnotation(Listener.class); if(listener != null) { log.debug("Listener annotation found for method \"" + method + "\", instantiating ListenerObject"); ListenerObject listenerObject = new ListenerObject(); listenerObject.setHint(listener.hint()); listenerObject.setMethod(method); listenerObject.setReturnType(method.getReturnType()); incompleteListenerObjects.add(listenerObject); } } return incompleteListenerObjects; }
rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false);
public static RequestResponseBus createRequestResponseBusForLocalUse(ListenerObjectCreator listenerObjectCreator, boolean setInRRB) { ListenerObjectRetriever listenerObjectRetriever = new CachedListenerObjectRetriever(listenerObjectCreator); ListenerObjectExecutor listenerObjectExecutor = new MultiThreadedListenerObjectExecutor(); LocalRequestProcessor localRequestProcessor = new LocalRequestProcessor(listenerObjectRetriever, listenerObjectExecutor); RequestDispatcher requestDispatcher = new LocalRequestDispatcher(localRequestProcessor); RequestResponseBus requestResponseBus = new DefaultRequestResponseBus(requestDispatcher); if(setInRRB) { RRB.setRequestResponseBus(requestResponseBus); } return requestResponseBus; }
rrb.send("say", "Hello World");
@Override public <T> T sendForSingleWithHint(String hint, Object... parameters) { log.info("Sending request for single object with hint '" + hint + "'"); log.debug("Creating Request object"); Request request = new Request(null, hint, parameters); log.debug("Dispatching Request"); Response<T> response = requestDispatcher.dispatchRequest(request); log.debug("Processing Response"); this.processThrowables(response.getThrowables()); return response.getReturnValues().isEmpty() ? null : response.getReturnValues().iterator().next(); }
requestDispatcher.dispatchRequest(request);
public <T> Response<T> dispatchRequest(Request request) { log.info("Dispatching Request"); Response<T> response = localRequestProcessor.processRequest(request); log.info("Returning Response"); return response; }
localRequestProcessor.processRequest(request);
public <T> Response<T> processRequest(Request request) { log.info("Processing Request"); log.debug("Getting ListenerObjects"); Collection<ListenerObject> listenerObjects; if(request.getDesiredReturnType() == null) { if(request.getHint() == null) { listenerObjects = listenerObjectRetriever.getListenerObjects(); } else { listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint()); } } else { if(request.getHint() == null) { listenerObjects = listenerObjectRetriever.getListenerObjects(request.getDesiredReturnType()); } else { listenerObjects = listenerObjectRetriever.getListenerObjects(request.getDesiredReturnType(), request.getHint()); } } log.debug("Executing ListenerObjects"); Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters()); log.info("Returning Response"); return response; }
listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint());
@Override public Collection<ListenerObject> getListenerObjects(String hint) { this.checkHint(hint); log.debug("Inspecting cache for matches"); RetrievalRequest request = new RetrievalRequest(hint, null); Collection<ListenerObject> listenerObjects = cache.get(request); if(listenerObjects == null) { log.debug("No match found, retrieving ListenerObject from DefaultRequestResponseBus and storing in cache"); listenerObjects = super.getListenerObjects(hint); cache.put(request, listenerObjects); } return listenerObjects; } @Override public Collection<ListenerObject> getListenerObjects(String hint) { Collection<ListenerObject> result = new ArrayList<ListenerObject>(); log.debug("Getting all ListenerObjects from the ListenerObjectCreator"); for(ListenerObject listenerObject : listenerObjectCreator.getListenerObjects()) { if(this.matchesHint(listenerObject, hint)) { log.trace(listenerObject + " matched hint '" + hint + "'"); result.add(listenerObject); } else { log.trace(listenerObject + " did not match hint '" + hint + "'"); } } return result; } protected boolean matchesHint(ListenerObject listenerObject, String hint) { if(hint == null) { throw new IllegalArgumentException("hint cannot be null!"); } return listenerObject.getHint().equals(hint); }
这块有一个bug,cache不能用HashMap,要用ConcurrentHashMap,HashMap在高并发下会出现Entry链路循环引用现象
public class CachedListenerObjectRetriever extends DefaultListenerObjectRetriever implements ListenerObjectCreationObserver { private static final Logger log = LoggerFactory.getLogger(CachedListenerObjectRetriever.class); /** * The cache of {@link Collection}s of {@link ListenerObject}s. */ protected Map<RetrievalRequest, Collection<ListenerObject>> cache; /** * Whenever you use this constructor, be sure to set the {@link ListenerObjectCreator} with the setter provided by this class. * If you don't, runtime {@link NullPointerException}s will occur. */ public CachedListenerObjectRetriever() { cache = new HashMap<RetrievalRequest, Collection<ListenerObject>>(); }
Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters());
public abstract class AbstractListenerObjectExecutor implements ListenerObjectExecutor { private static final Logger log = LoggerFactory.getLogger(AbstractListenerObjectExecutor.class); @SuppressWarnings("unchecked") //We assume that the caller has correctly specified the type of objects that he wants back from the listeners @Override public <T> Response<T> executeListeners(Collection<ListenerObject> listenerObjects, Object... parameters) { Collection<T> results = new ArrayList<T>(); Collection<Throwable> throwables = new ArrayList<Throwable>(); log.debug("Executing ListenerObjects"); for(ExecutionResult executionResult : this.doExecuteListeners(listenerObjects, parameters)) { if(executionResult != null && !(executionResult instanceof VoidResult)) { if(executionResult instanceof ThrowableResult) { throwables.add((((ThrowableResult) executionResult)).getThrowable()); } else { results.add((T) (((ObjectResult) executionResult).getResult())); } } } return new Response<T>(results, throwables); }
MultiThreadedListenerObjectExecutor.doExecuteListeners()
public class MultiThreadedListenerObjectExecutor extends AbstractListenerObjectExecutor { private static final Logger log = LoggerFactory.getLogger(MultiThreadedListenerObjectExecutor.class); @Override protected Collection<ExecutionResult> doExecuteListeners(final Collection<ListenerObject> listenerObjects, final Object... parameters) { final Collection<ExecutionResult> executionResults = new Vector<ExecutionResult>(); //Vector is Thread-safe if(listenerObjects.size() == 1) { //There is only one, don't spawn a new Thread, but do it in this Thread log.debug("There is only one ListenerObject, not creating new Thread, executing it in this Thread"); executionResults.add(this.executeSingleListenerObject(listenerObjects.iterator().next(), parameters)); return executionResults; } Collection<Thread> threads = new ArrayList<Thread>(); log.debug("Creating Threads for each ListenerObject"); for(final ListenerObject listenerObject : listenerObjects) { log.debug("Creating Thread for ListenerObject \"" + listenerObject + "\""); Thread thread = new Thread(new Runnable() { @Override public void run() { executionResults.add(MultiThreadedListenerObjectExecutor.this.executeSingleListenerObject(listenerObject, parameters)); } }); log.debug("Starting Thread for ListenerObject \"" + listenerObject + "\""); thread.start(); threads.add(thread); } log.debug("Waiting for all Threads to finish"); for(Thread thread : threads) { try { thread.join(); } catch(InterruptedException e) { throw new RuntimeException(e); } } log.debug("All Threads have finished, returning"); return executionResults; } }
AbstractListenerObjectExecutor.executeSingleListenerObject
protected ExecutionResult executeSingleListenerObject(ListenerObject listenerObject, Object... parameters) { try { Object returnValue = listenerObject.getMethod().invoke(listenerObject.getTarget(), parameters); if(listenerObject.getMethod().getReturnType().equals(void.class)) { //Nothing to return log.debug("ListenerObject \"" + listenerObject + "\" successfully executed, return value was void"); return new VoidResult(); } else { log.debug("ListenerObject \"" + listenerObject + "\" successfully executed, return value was object"); return new ObjectResult(returnValue); } } catch(InvocationTargetException e) { log.debug("Underlying method of ListenerObject \"" + listenerObject + "\" threw Throwable"); //Caused by the underlying method throwing a Throwable. Rethrowing... return new ThrowableResult(e.getCause()); } catch(Exception e) { log.trace("Method of ListenerObject \"" + listenerObject + "\" did not match parameters, ignoring", e); //Probably caused by parameters not matching Method signature. Ignoring... return null; } }
RRiBbit相比其他框架的优点是:
1. 其他框架一般要求监听者实现特定的Listener接口,甚至执行onEvent() or onRequest() 方法,而RRiBbit只需要在方法上标注 @Listener元注解即可。
2.其他框架因第一个条件需要改变源码,而RRiBbit没有必要。
3.其他框架监听者不能将结果发回发送者,如果你非要这样做,得做些黑客的工作,比如通过修改参数对象什么的,而RRiBbit的监听者方法能够返回一个POJO给发送者。
4.其他框架并不支持监听者在多个线程中,RRiBbit能够配置运行在不同线程,每个监听对应一个事件。
5.其他框架并不支持远程,只是工作在一个虚拟机中,RRiBbit监听者能够运行在其他机器上,而调用者无需任何变化。