RRiBbit学习笔记

RRiBbit可以作为事件总线Eventbus, 能够让组件之间进行双向通讯,支持远程功能,实现失败恢复 负载平衡, SSL/TLS等支持,这也称为请求-响应总线(Request-Response-Bus).

所有事情都是从HelloWorld开始
import org.rribbit.Listener;
import org.rribbit.RRiBbitUtil;
import org.rribbit.RequestResponseBus;
import org.rribbit.creation.InstantiatingClassBasedListenerObjectCreator;
import org.rribbit.creation.ObjectBasedListenerObjectCreator;

public class HelloWorld {

    private static RequestResponseBus rrb;
    
    public static final void main(String[] args) throws Exception {

        ObjectBasedListenerObjectCreator creator = new InstantiatingClassBasedListenerObjectCreator(HelloWorld.class);
        rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false);
        HelloWorld helloWorld = new HelloWorld();
        helloWorld.sayHello();
    }
    
    public void sayHello() {
        rrb.send("say", "Hello World");
    }
    
    @Listener(hint="say")
    public void say(String message) {
        System.out.println(message);
    }
    
    @Listener(hint="say")
    public void say2(String message) {
        System.out.println(message + " : )");
    }
}




看看InstantiatingClassBasedListenerObjectCreator都做了什么
public class InstantiatingClassBasedListenerObjectCreator extends AbstractClassBasedListenerObjectCreator {

	private static final Logger log = LoggerFactory.getLogger(InstantiatingClassBasedListenerObjectCreator.class);

	/**
	 * @see AbstractClassBasedListenerObjectCreator#AbstractClassBasedListenerObjectCreator(Class...)
	 *
	 * @param classes
	 */
	public InstantiatingClassBasedListenerObjectCreator(Class<?>... classes) {
		super(classes);
	}

public abstract class AbstractClassBasedListenerObjectCreator extends ObjectBasedListenerObjectCreator {

	private static final Logger log = LoggerFactory.getLogger(AbstractClassBasedListenerObjectCreator.class);

	protected Collection<Class<?>> excludedClasses;

	/**
	 * Calls {@link #addClass(Class)} on each given class.
	 *
	 * @param classes
	 */
	public AbstractClassBasedListenerObjectCreator(Class<?>... classes) {

		excludedClasses = new ArrayList<Class<?>>();
		for(Class<?> clazz : classes) {
			this.addClass(clazz);
		}
	}

        public void addClass(Class<?> clazz) {

		log.debug("Processing class '" + clazz.getName() + "'");
		Object targetObject = this.getTargetObjectForClass(clazz);
		if(targetObject != null) {
			log.debug("Found target object for class '" + clazz.getName() + "', getting Listener methods");
			Collection<ListenerObject> incompleteListenerObjects = this.getIncompleteListenerObjectsFromClass(clazz);
			for(ListenerObject listenerObject : incompleteListenerObjects) {
				listenerObject.setTarget(targetObject);
			}
			listenerObjects.addAll(incompleteListenerObjects);
			this.notifyObserversOnClassAdded(clazz);
		}
	}

        @Override
	protected Object getTargetObjectForClass(Class<?> clazz) {

		try {
			log.debug("Attempting to construct instance of class '" + clazz.getName() + "'");
			return clazz.getConstructor().newInstance();
		} catch(InvocationTargetException e) {
			throw new RuntimeException("Exception thrown from constructor of Listener", e.getCause());
		} catch(Exception e) {
			log.debug("No suitable constructor found, returning null");
			return null;
		}
	}


protected Collection<ListenerObject> getIncompleteListenerObjectsFromClass(Class<?> clazz) {

		Collection<ListenerObject> incompleteListenerObjects = new ArrayList<ListenerObject>();
		for(Method method : clazz.getMethods()) {
			Listener listener = method.getAnnotation(Listener.class);
			if(listener != null) {
				log.debug("Listener annotation found for method \"" + method + "\", instantiating ListenerObject");
				ListenerObject listenerObject = new ListenerObject();
				listenerObject.setHint(listener.hint());
				listenerObject.setMethod(method);
				listenerObject.setReturnType(method.getReturnType());
				incompleteListenerObjects.add(listenerObject);
			}
		}
		return incompleteListenerObjects;
	}


rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false);
public static RequestResponseBus createRequestResponseBusForLocalUse(ListenerObjectCreator listenerObjectCreator, boolean setInRRB) {

		ListenerObjectRetriever listenerObjectRetriever = new CachedListenerObjectRetriever(listenerObjectCreator);
		ListenerObjectExecutor listenerObjectExecutor = new MultiThreadedListenerObjectExecutor();
		LocalRequestProcessor localRequestProcessor = new LocalRequestProcessor(listenerObjectRetriever, listenerObjectExecutor);
		RequestDispatcher requestDispatcher = new LocalRequestDispatcher(localRequestProcessor);
		RequestResponseBus requestResponseBus = new DefaultRequestResponseBus(requestDispatcher);
		if(setInRRB) {
			RRB.setRequestResponseBus(requestResponseBus);
		}
		return requestResponseBus;
	}


rrb.send("say", "Hello World");
@Override
	public <T> T sendForSingleWithHint(String hint, Object... parameters) {

		log.info("Sending request for single object with hint '" + hint + "'");

		log.debug("Creating Request object");
		Request request = new Request(null, hint, parameters);

		log.debug("Dispatching Request");
		Response<T> response = requestDispatcher.dispatchRequest(request);

		log.debug("Processing Response");
		this.processThrowables(response.getThrowables());
		return response.getReturnValues().isEmpty() ? null : response.getReturnValues().iterator().next();
	}


requestDispatcher.dispatchRequest(request);
	public <T> Response<T> dispatchRequest(Request request) {

		log.info("Dispatching Request");
		Response<T> response = localRequestProcessor.processRequest(request);
		log.info("Returning Response");
		return response;
	}


localRequestProcessor.processRequest(request);
public <T> Response<T> processRequest(Request request) {

		log.info("Processing Request");

		log.debug("Getting ListenerObjects");
		Collection<ListenerObject> listenerObjects;
		if(request.getDesiredReturnType() == null) {
			if(request.getHint() == null) {
				listenerObjects = listenerObjectRetriever.getListenerObjects();
			} else {
				listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint());
			}
		} else {
			if(request.getHint() == null) {
				listenerObjects = listenerObjectRetriever.getListenerObjects(request.getDesiredReturnType());
			} else {
				listenerObjects = listenerObjectRetriever.getListenerObjects(request.getDesiredReturnType(), request.getHint());
			}
		}

		log.debug("Executing ListenerObjects");
		Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters());
		log.info("Returning Response");
		return response;
	}


listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint());
@Override
	public Collection<ListenerObject> getListenerObjects(String hint) {

		this.checkHint(hint);

		log.debug("Inspecting cache for matches");
		RetrievalRequest request = new RetrievalRequest(hint, null);
		Collection<ListenerObject> listenerObjects = cache.get(request);
		if(listenerObjects == null) {
			log.debug("No match found, retrieving ListenerObject from DefaultRequestResponseBus and storing in cache");
			listenerObjects = super.getListenerObjects(hint);
			cache.put(request, listenerObjects);
		}
		return listenerObjects;
	}

        @Override
	public Collection<ListenerObject> getListenerObjects(String hint) {

		Collection<ListenerObject> result = new ArrayList<ListenerObject>();
		log.debug("Getting all ListenerObjects from the ListenerObjectCreator");
		for(ListenerObject listenerObject : listenerObjectCreator.getListenerObjects()) {
			if(this.matchesHint(listenerObject, hint)) {
				log.trace(listenerObject + " matched hint '" + hint + "'");
				result.add(listenerObject);
			} else {
				log.trace(listenerObject + " did not match hint '" + hint + "'");
			}
		}
		return result;
	}

	protected boolean matchesHint(ListenerObject listenerObject, String hint) {

		if(hint == null) {
			throw new IllegalArgumentException("hint cannot be null!");
		}
		return listenerObject.getHint().equals(hint);
	}


这块有一个bug,cache不能用HashMap,要用ConcurrentHashMap,HashMap在高并发下会出现Entry链路循环引用现象
public class CachedListenerObjectRetriever extends DefaultListenerObjectRetriever implements ListenerObjectCreationObserver {

	private static final Logger log = LoggerFactory.getLogger(CachedListenerObjectRetriever.class);

	/**
	 * The cache of {@link Collection}s of {@link ListenerObject}s.
	 */
	protected Map<RetrievalRequest, Collection<ListenerObject>> cache;

	/**
	 * Whenever you use this constructor, be sure to set the {@link ListenerObjectCreator} with the setter provided by this class.
	 * If you don't, runtime {@link NullPointerException}s will occur.
	 */
	public CachedListenerObjectRetriever() {

		cache = new HashMap<RetrievalRequest, Collection<ListenerObject>>();
	}


Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters());
public abstract class AbstractListenerObjectExecutor implements ListenerObjectExecutor {

	private static final Logger log = LoggerFactory.getLogger(AbstractListenerObjectExecutor.class);

	@SuppressWarnings("unchecked") //We assume that the caller has correctly specified the type of objects that he wants back from the listeners
	@Override
	public <T> Response<T> executeListeners(Collection<ListenerObject> listenerObjects, Object... parameters) {

		Collection<T> results = new ArrayList<T>();
		Collection<Throwable> throwables = new ArrayList<Throwable>();

		log.debug("Executing ListenerObjects");
		for(ExecutionResult executionResult : this.doExecuteListeners(listenerObjects, parameters)) {
			if(executionResult != null && !(executionResult instanceof VoidResult)) {
				if(executionResult instanceof ThrowableResult) {
					throwables.add((((ThrowableResult) executionResult)).getThrowable());
				} else {
					results.add((T) (((ObjectResult) executionResult).getResult()));
				}
			}
		}
		return new Response<T>(results, throwables);
	}


MultiThreadedListenerObjectExecutor.doExecuteListeners()
public class MultiThreadedListenerObjectExecutor extends AbstractListenerObjectExecutor {

	private static final Logger log = LoggerFactory.getLogger(MultiThreadedListenerObjectExecutor.class);

	@Override
	protected Collection<ExecutionResult> doExecuteListeners(final Collection<ListenerObject> listenerObjects, final Object... parameters) {

		final Collection<ExecutionResult> executionResults = new Vector<ExecutionResult>(); //Vector is Thread-safe

		if(listenerObjects.size() == 1) { //There is only one, don't spawn a new Thread, but do it in this Thread
			log.debug("There is only one ListenerObject, not creating new Thread, executing it in this Thread");
			executionResults.add(this.executeSingleListenerObject(listenerObjects.iterator().next(), parameters));
			return executionResults;
		}

		Collection<Thread> threads = new ArrayList<Thread>();
		log.debug("Creating Threads for each ListenerObject");
		for(final ListenerObject listenerObject : listenerObjects) {
			log.debug("Creating Thread for ListenerObject \"" + listenerObject + "\"");
			Thread thread = new Thread(new Runnable() {
				@Override
				public void run() {
					executionResults.add(MultiThreadedListenerObjectExecutor.this.executeSingleListenerObject(listenerObject, parameters));
				}
			});
			log.debug("Starting Thread for ListenerObject \"" + listenerObject + "\"");
			thread.start();
			threads.add(thread);
		}
		log.debug("Waiting for all Threads to finish");
		for(Thread thread : threads) {
			try {
				thread.join();
			} catch(InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
		log.debug("All Threads have finished, returning");
		return executionResults;
	}
}


AbstractListenerObjectExecutor.executeSingleListenerObject
protected ExecutionResult executeSingleListenerObject(ListenerObject listenerObject, Object... parameters) {

		try {
			Object returnValue = listenerObject.getMethod().invoke(listenerObject.getTarget(), parameters);
			if(listenerObject.getMethod().getReturnType().equals(void.class)) { //Nothing to return
				log.debug("ListenerObject \"" + listenerObject + "\" successfully executed, return value was void");
				return new VoidResult();
			} else {
				log.debug("ListenerObject \"" + listenerObject + "\" successfully executed, return value was object");
				return new ObjectResult(returnValue);
			}
		} catch(InvocationTargetException e) {
			log.debug("Underlying method of ListenerObject \"" + listenerObject + "\" threw Throwable");
			//Caused by the underlying method throwing a Throwable. Rethrowing...
			return new ThrowableResult(e.getCause());
		} catch(Exception e) {
			log.trace("Method of ListenerObject \"" + listenerObject + "\" did not match parameters, ignoring", e);
			//Probably caused by parameters not matching Method signature. Ignoring...
			return null;
		}
	}



RRiBbit相比其他框架的优点是:
1. 其他框架一般要求监听者实现特定的Listener接口,甚至执行onEvent() or onRequest() 方法,而RRiBbit只需要在方法上标注 @Listener元注解即可。

2.其他框架因第一个条件需要改变源码,而RRiBbit没有必要。

3.其他框架监听者不能将结果发回发送者,如果你非要这样做,得做些黑客的工作,比如通过修改参数对象什么的,而RRiBbit的监听者方法能够返回一个POJO给发送者。

4.其他框架并不支持监听者在多个线程中,RRiBbit能够配置运行在不同线程,每个监听对应一个事件。

5.其他框架并不支持远程,只是工作在一个虚拟机中,RRiBbit监听者能够运行在其他机器上,而调用者无需任何变化。

猜你喜欢

转载自woming66.iteye.com/blog/1596848