EventS:需要被处理的数据
Event Handlers:处理Event的方式方法
Event Loop:维护Events和Event Handler之间的交互流程,一般采用queue代替
Events属性
类型和数据
同步的EDA框架实现
public interface Message {
Class<? extends Message> getType();
}
public interface Channel<E>{
void dispatch(E message);
}
public interface DynamicRouter<E extends Message>{
void RegisterChannel(Class<? extends Message> messageType,Channel<? extends E> channel);
void dispatch(E message) throws Exception;
}
public class Event implements Message{
@Override
public Class<? extends Message> getType() {
return getClass();
}
}
/**
* 非线程安全
*/
public class EventDispatcher implements DynamicRouter<Message>{
private final Map<Class<? extends Message>,Channel> routerTable;
public EventDispatcher(){
this.routerTable = new HashMap<>();
}
@Override
public void RegisterChannel(Class<? extends Message> messageType, Channel<? extends Message> channel) {
this.routerTable.put(messageType,channel);
}
@Override
public void dispatch(Message message) throws Exception {
if(routerTable.containsKey(message.getType())){
routerTable.get(message.getType()).dispatch(message);
}else{
throw new Exception();
}
}
}
public class EventDispatcherExample {
static class InputEvent extends Event{
private final int x;
private final int y;
public InputEvent(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() {
return x;
}
public int getY() {
return y;
}
}
static class ResultEvent extends Event{
private final int reult;
public ResultEvent(int reult) {
this.reult = reult;
}
public int getReult() {
return reult;
}
}
static class ResultEventHandler implements Channel<ResultEvent>{
@Override
public void dispatch(ResultEvent message) {
System.out.println("the result is "+ message.reult);
}
}
static class InputEventHandler implements Channel<InputEvent>{
private final EventDispatcher dispatcher;
public InputEventHandler(EventDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
@Override
public void dispatch(InputEvent message) {
int reulst = message.getX()+message.getY();
try {
dispatcher.dispatch(new ResultEvent(reulst));
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] a) throws Exception {
EventDispatcher eventDispatcher = new EventDispatcher();
eventDispatcher.RegisterChannel(InputEvent.class,new InputEventHandler(eventDispatcher));
eventDispatcher.RegisterChannel(ResultEvent.class,new ResultEventHandler());
eventDispatcher.dispatch(new InputEvent(12,23));
}
}
同步的事件驱动缺点:如果其中一个Channel耗时太长。会影响其他Channel
异步的Channel实现
public abstract class AsyncChannel implements Channel<Event>{
private final ExecutorService executorService;
public AsyncChannel(){
this(Executors.newFixedThreadPool(Runtime.getRuntime().avaliableProcessors()*2)));
}
public AsynChannel(ExecutorService executorService ){
this.executorService =executorService;
}
public final void dispatch(Event message){
executorService.submit(() -> this.handle(message));
}
protected abstract void hanlde(Event message);
void stop(){
if(null != executorService && executorService.isShutDown())
executorService.shutDown();
}
}