spring中实现远程监听

   近期项目需要集群,缓存集群是自己实现的,需要在缓存发生变动后,需要发生消息给各个节点更新缓存。所以就做了个远程监听功能。远程监听用rmi协议,事件发布前都动态查询出活动的节点,事件发布后会被活动节点上的listener监听到。上代码
1.定义event和listener
public class  BaseEvent  extends EventObject {


	private static final long serialVersionUID = 1L;
	
	
	/** System time when the event happened */
	private final long timestamp;
	

	public BaseEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis();
		
	}
	
	/**
	 * Return the system time in milliseconds when the event happened.
	 */
	public final long getTimestamp() {
		return this.timestamp;
	}
}

public interface EventLisenter<T extends BaseEvent>{

	/**
	 * 事件处理
	 * @param baseEvent
	 */
	void onEvent(T t);
}

2、定义远程监听配置
public  class RemoteLisenter{
	
	private  Class  eventClass;
	
	private Class serviceInterface;
	
	private String serviceName;
	
	private String  registryPort;

	public Class getServiceInterface() {
		return serviceInterface;
	}

	public void setServiceInterface(Class serviceInterface) {
		this.serviceInterface = serviceInterface;
	}

	public String getServiceName() {
		return serviceName;
	}

	public void setServiceName(String serviceName) {
		this.serviceName = serviceName;
	}


监听管理类,用于事件注册,更新远程监听,发布事件
public class RemoteLisenterConfig {

	private List<RemoteLisenter> remoteLisenters =new ArrayList<RemoteLisenter>();
	
	
	public List<RemoteLisenter> getRemoteLisenters() {
		return remoteLisenters;
	}


	public void setRemoteLisenters(List<RemoteLisenter> remoteLisenters) {
		this.remoteLisenters = remoteLisenters;
	}

}


@Service
public class ListennerManagement {

	
	protected Logger logger = Logger.getLogger(getClass());

	
	@Autowired
	private HeartbeatService heartbeatService;
	
	
	@Autowired
	RemoteLisenterConfig remoteLisenterConfig;
	
	
	
	

	/**
	 * 本地监听
	 */
	private  Map<String, List<EventLisenter>> localListeners = new LinkedHashMap<String, List<EventLisenter>>();
	
	/**
	 * 远程监听
	 */
	private  Map<String, List<EventLisenter>> remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();

	

	/**
	 * 扫瞄所有bean,进行队本地事件进行事件监听
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings("rawtypes")
	public  void registryListener(ApplicationContext  ctx) throws Exception {
		// 取得容器中所有监听
		Map<String, EventLisenter> beans = ctx
				.getBeansOfType(EventLisenter.class);
		if (beans == null || beans.size() == 0) {
			return;
		}
		Collection<EventLisenter> list = beans.values();
		for (EventLisenter listener : list) {
			Class listenercls = AopTargetUtils.getTarget(listener).getClass();
			Class eventCls = GenericTypeResolver.resolveTypeArgument(
					listenercls, EventLisenter.class);

			try {
				if (localListeners.containsKey(eventCls.getName())) {
					localListeners.get(eventCls.getName()).add(listener);
				} else {
					List<EventLisenter> l = new ArrayList<EventLisenter>();
					l.add(listener);
					localListeners.put(eventCls.getName(), l);
				}
			} catch (Exception e) {
				throw new Exception("初始化事件监听器时出错:", e);
			}
		}

	}

	
	 private void refreshRemoteListeners(){
		//查询出集群服务器的IP(此处从数据库配置中查询)
		List<String> ipList=heartbeatService.getAliveHostsExcludeSelf();
		List<RemoteLisenter>  RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
		remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
		if(RemoteLisenterList!=null){
			for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
				
				String eventClsName=remoteLisenter.getEventClass().getName();
				Class listenerCls=remoteLisenter.getServiceInterface();
				String port=remoteLisenter.getRegistryPort();
				String serviceName=remoteLisenter.getServiceName();
				if(ipList!=null){
					for (String ip : ipList) {
						EhCacheService ehCacheService=null;
						EventLisenter listener = buildRemotListener(listenerCls, port,serviceName, ip);
						
						if(listener!=null){
							if (remoteListeners.containsKey(eventClsName)) {
								remoteListeners.get(eventClsName).add(listener);
							} else {
								List<EventLisenter> l = new ArrayList<EventLisenter>();
								l.add(listener);
								remoteListeners.put(eventClsName, l);
							}
						}
					}
				}
			
		}
			
			
			
		}
	}


	private EventLisenter buildRemotListener(Class listenerCls, String port,
			String serviceName, String ip) {
		
		try {
			RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
			rmiProxyFactoryBean.setServiceInterface(listenerCls); 
			rmiProxyFactoryBean.setServiceUrl("rmi://"+ip+":"+port+"/"+serviceName);
			rmiProxyFactoryBean.afterPropertiesSet();
			
			if (rmiProxyFactoryBean.getObject() instanceof EventLisenter) { 
				EventLisenter  listener=(EventLisenter)rmiProxyFactoryBean.getObject();
				return listener;
			}else{
				return null;
			}
		} catch (Exception e) {
			logger.error("获取远程监听bean错误[listenerClass="+listenerCls+";port="+port+";ip="+ip+";serviceName="+serviceName+"]", e);
			return null;
		}
		
		
		

		
	}
	
	
	/**
	 * 发布事件
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings("rawtypes")
	public  void publishEvent(BaseEvent event)  {
		//本地监控
		List<EventLisenter> localList = localListeners.get(event.getClass().getName());
		if (localList != null) {
			for (EventLisenter listener : localList) {
				try {
					listener.onEvent(event);
				} catch (Exception e) {
					logger.error(e.getMessage());
				}
			}
		}
		
		//远程监控
		Class eventClass=event.getClass();
		if(needRemoteListenre(eventClass)){
			//刷新远程监听者
			refreshRemoteListeners();
			
			List<EventLisenter> remoteList = remoteListeners.get(event.getClass().getName());
			if (remoteList != null) {
				for (EventLisenter listener : remoteList) {
					try {
						listener.onEvent(event);
					} catch (Exception e) {
						logger.error(e.getMessage());
					}
				}
			}
		}
	}


    /**
     * 判断本事件是否需要远程监听
     * @param eventClass
     * @return
     */
	private boolean needRemoteListenre(Class eventClass) {
		
		List<RemoteLisenter>  RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
		if(RemoteLisenterList!=null){
			for (RemoteLisenter remoteLisenter : RemoteLisenterList) {			
			     Class eventCls=remoteLisenter.getEventClass();
			     if(eventCls.equals(eventCls))
			    	 return true;
			}
		}
		return false;	
			
	}


	public Map<String, List<EventLisenter>> getLocalListeners() {
		return localListeners;
	}


	public Map<String, List<EventLisenter>> getRemoteListeners() {
		return remoteListeners;
	}
	


配置文件
<!-- 配置远程监听配置对象 -->
	<bean id="remoteLisenterConfig"  class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenterConfig">
	  <property name="remoteLisenters">
	     <list>
	     <bean id="remoteLisenter1"  class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenter">
	          <property name="eventClass"      value="com.ejintai.cbs_policy_registry.base.event.biz.EhCacheUpdateEvent" />  
	          <property name="serviceName"         value="ehCacheUpdateEventListener" />  
              <property name="serviceInterface"    value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>  
      		  <property name="registryPort"        value="${rmi.port}"/>  
	      </bean>
	     </list>
	  </property>
	</bean>

 <bean id="remoteEhCacheUpdateEventListener" class="org.springframework.remoting.rmi.RmiServiceExporter" >  
        <property name="serviceName"         value="ehCacheUpdateEventListener" />  
        <property name="service"             ref="ehCacheUpdateEventListener"/>  
        <property name="serviceInterface"    value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>  
        <property name="registryPort"        value="${rmi.port}"/>  
    </bean>

猜你喜欢

转载自qingshizhi.iteye.com/blog/2378790