近期项目需要集群,缓存集群是自己实现的,需要在缓存发生变动后,需要发生消息给各个节点更新缓存。所以就做了个远程监听功能。远程监听用rmi协议,事件发布前都动态查询出活动的节点,事件发布后会被活动节点上的listener监听到。上代码
1.定义event和listener
2、定义远程监听配置
监听管理类,用于事件注册,更新远程监听,发布事件
配置文件
1.定义event和listener
- public class BaseEvent extends EventObject {
- private static final long serialVersionUID = 1L;
- /** System time when the event happened */
- private final long timestamp;
- public BaseEvent(Object source) {
- super(source);
- this.timestamp = System.currentTimeMillis();
- }
- /**
- * Return the system time in milliseconds when the event happened.
- */
- public final long getTimestamp() {
- return this.timestamp;
- }
- }
- public interface EventLisenter<T extends BaseEvent>{
- /**
- * 事件处理
- * @param baseEvent
- */
- void onEvent(T t);
- }
2、定义远程监听配置
- public class RemoteLisenter{
- private Class eventClass;
- private Class serviceInterface;
- private String serviceName;
- private String registryPort;
- public Class getServiceInterface() {
- return serviceInterface;
- }
- public void setServiceInterface(Class serviceInterface) {
- this.serviceInterface = serviceInterface;
- }
- public String getServiceName() {
- return serviceName;
- }
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
监听管理类,用于事件注册,更新远程监听,发布事件
- public class RemoteLisenterConfig {
- private List<RemoteLisenter> remoteLisenters =new ArrayList<RemoteLisenter>();
- public List<RemoteLisenter> getRemoteLisenters() {
- return remoteLisenters;
- }
- public void setRemoteLisenters(List<RemoteLisenter> remoteLisenters) {
- this.remoteLisenters = remoteLisenters;
- }
- }
- @Service
- public class ListennerManagement {
- protected Logger logger = Logger.getLogger(getClass());
- @Autowired
- private HeartbeatService heartbeatService;
- @Autowired
- RemoteLisenterConfig remoteLisenterConfig;
- /**
- * 本地监听
- */
- private Map<String, List<EventLisenter>> localListeners = new LinkedHashMap<String, List<EventLisenter>>();
- /**
- * 远程监听
- */
- private Map<String, List<EventLisenter>> remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
- /**
- * 扫瞄所有bean,进行队本地事件进行事件监听
- *
- * @throws Exception
- */
- @SuppressWarnings("rawtypes")
- public void registryListener(ApplicationContext ctx) throws Exception {
- // 取得容器中所有监听
- Map<String, EventLisenter> beans = ctx
- .getBeansOfType(EventLisenter.class);
- if (beans == null || beans.size() == 0) {
- return;
- }
- Collection<EventLisenter> list = beans.values();
- for (EventLisenter listener : list) {
- Class listenercls = AopTargetUtils.getTarget(listener).getClass();
- Class eventCls = GenericTypeResolver.resolveTypeArgument(
- listenercls, EventLisenter.class);
- try {
- if (localListeners.containsKey(eventCls.getName())) {
- localListeners.get(eventCls.getName()).add(listener);
- } else {
- List<EventLisenter> l = new ArrayList<EventLisenter>();
- l.add(listener);
- localListeners.put(eventCls.getName(), l);
- }
- } catch (Exception e) {
- throw new Exception("初始化事件监听器时出错:", e);
- }
- }
- }
- private void refreshRemoteListeners(){
- //查询出集群服务器的IP(此处从数据库配置中查询)
- List<String> ipList=heartbeatService.getAliveHostsExcludeSelf();
- List<RemoteLisenter> RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
- remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
- if(RemoteLisenterList!=null){
- for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
- String eventClsName=remoteLisenter.getEventClass().getName();
- Class listenerCls=remoteLisenter.getServiceInterface();
- String port=remoteLisenter.getRegistryPort();
- String serviceName=remoteLisenter.getServiceName();
- if(ipList!=null){
- for (String ip : ipList) {
- EhCacheService ehCacheService=null;
- EventLisenter listener = buildRemotListener(listenerCls, port,serviceName, ip);
- if(listener!=null){
- if (remoteListeners.containsKey(eventClsName)) {
- remoteListeners.get(eventClsName).add(listener);
- } else {
- List<EventLisenter> l = new ArrayList<EventLisenter>();
- l.add(listener);
- remoteListeners.put(eventClsName, l);
- }
- }
- }
- }
- }
- }
- }
- private EventLisenter buildRemotListener(Class listenerCls, String port,
- String serviceName, String ip) {
- try {
- RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
- rmiProxyFactoryBean.setServiceInterface(listenerCls);
- rmiProxyFactoryBean.setServiceUrl("rmi://"+ip+":"+port+"/"+serviceName);
- rmiProxyFactoryBean.afterPropertiesSet();
- if (rmiProxyFactoryBean.getObject() instanceof EventLisenter) {
- EventLisenter listener=(EventLisenter)rmiProxyFactoryBean.getObject();
- return listener;
- }else{
- return null;
- }
- } catch (Exception e) {
- logger.error("获取远程监听bean错误[listenerClass="+listenerCls+";port="+port+";ip="+ip+";serviceName="+serviceName+"]", e);
- return null;
- }
- }
- /**
- * 发布事件
- *
- * @throws Exception
- */
- @SuppressWarnings("rawtypes")
- public void publishEvent(BaseEvent event) {
- //本地监控
- List<EventLisenter> localList = localListeners.get(event.getClass().getName());
- if (localList != null) {
- for (EventLisenter listener : localList) {
- try {
- listener.onEvent(event);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
- //远程监控
- Class eventClass=event.getClass();
- if(needRemoteListenre(eventClass)){
- //刷新远程监听者
- refreshRemoteListeners();
- List<EventLisenter> remoteList = remoteListeners.get(event.getClass().getName());
- if (remoteList != null) {
- for (EventLisenter listener : remoteList) {
- try {
- listener.onEvent(event);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
- }
- }
- /**
- * 判断本事件是否需要远程监听
- * @param eventClass
- * @return
- */
- private boolean needRemoteListenre(Class eventClass) {
- List<RemoteLisenter> RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
- if(RemoteLisenterList!=null){
- for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
- Class eventCls=remoteLisenter.getEventClass();
- if(eventCls.equals(eventCls))
- return true;
- }
- }
- return false;
- }
- public Map<String, List<EventLisenter>> getLocalListeners() {
- return localListeners;
- }
- public Map<String, List<EventLisenter>> getRemoteListeners() {
- return remoteListeners;
- }
配置文件
- <!-- 配置远程监听配置对象 -->
- <bean id="remoteLisenterConfig" class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenterConfig">
- <property name="remoteLisenters">
- <list>
- <bean id="remoteLisenter1" class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenter">
- <property name="eventClass" value="com.ejintai.cbs_policy_registry.base.event.biz.EhCacheUpdateEvent" />
- <property name="serviceName" value="ehCacheUpdateEventListener" />
- <property name="serviceInterface" value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>
- <property name="registryPort" value="${rmi.port}"/>
- </bean>
- </list>
- </property>
- </bean>
- <bean id="remoteEhCacheUpdateEventListener" class="org.springframework.remoting.rmi.RmiServiceExporter" >
- <property name="serviceName" value="ehCacheUpdateEventListener" />
- <property name="service" ref="ehCacheUpdateEventListener"/>
- <property name="serviceInterface" value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>
- <property name="registryPort" value="${rmi.port}"/>
- </bean>