导入Guava 包:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency>
spring 整合Guagva EventBus事件配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <context:component-scan base-package="demo.dcn"><!-- 扫描一下的包,完成注册bean 并过滤掉控制器扫描 --> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </context:component-scan> <!-- 线程池 --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="20" /> <property name="maxPoolSize" value="200" /> <property name="queueCapacity" value="1000000" /> <property name="keepAliveSeconds" value="600" /> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --> <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean> <bean id="userFeedRedisSubscriber" class="demo.dcn.guava.eventBus.subscriber.UserFeedRedisSubscriber" /> <!-- 事件总线 --> <bean id="eventBus" class="com.google.common.eventbus.AsyncEventBus" > <constructor-arg ref="taskExecutor" /> </bean> <!-- 主册事件 --> <bean id="eventBuilder" class="demo.dcn.guava.eventBus.AsynEventBusBuilder"> <property name="eventBus" ref="eventBus"/> <property name="handlers"> <set> <!-- <ref bean="msgNoticeSubscriber"/> <ref bean="userActivitySubcriber"/> --> <ref bean="userFeedRedisSubscriber"/> </set> </property> </bean> </beans>
event 事件:
package demo.dcn.guava.eventBus.events; import java.io.Serializable; /** * 用户喜欢关注,移除关注,触发的交互事件 * @author [email protected] * * */ public class FollowEvent implements Serializable{ private static final long serialVersionUID = 1L; // 用户ID private Long lookerId; // 类型 private FriendEventType type; private Long friendId; public FollowEvent(Long lookerId, FriendEventType type, Long friendId) { super(); this.lookerId = lookerId; this.type = type; this.friendId = friendId; } public FollowEvent() { super(); } public enum FriendEventType{ ADD_FOLLOW_FRIEND(1L, "增加关注好友"),ADD_FANS_FRIEND(2L, "增加粉丝好友"),DEL_FOLLOW_FRIEND(3L, "删除关注好友"),DEL_FANS_FRIEND(4L, "删除粉丝好友"); private Long id; private String desc; private FriendEventType(Long id, String desc) { this.id = id; this.desc = desc; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } } }
事件注入中心:
package demo.dcn.guava.eventBus; import java.util.Set; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import com.google.common.eventbus.AsyncEventBus; /** * 事件注入中心 * @author [email protected] * * */ public class AsynEventBusBuilder implements InitializingBean,DisposableBean { private AsyncEventBus eventBus; private Set<Object> handlers; @Override public void destroy() throws Exception { } @Override public void afterPropertiesSet() throws Exception { for (Object handler : handlers) { eventBus.register(handler); } } /** * @return the eventBus */ public AsyncEventBus getEventBus() { return eventBus; } /** * @param eventBus the eventBus to set */ public void setEventBus(AsyncEventBus eventBus) { this.eventBus = eventBus; } /** * @return the handlers */ public Set<Object> getHandlers() { return handlers; } /** * @param handlers the handlers to set */ public void setHandlers(Set<Object> handlers) { this.handlers = handlers; } }
异步处理事件,通过 @Subscribe 注解,
package demo.dcn.guava.eventBus.subscriber; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import demo.dcn.guava.eventBus.events.FollowEvent; /** * 用户活动订阅事件处理 * @author [email protected] * * */ public class UserFeedRedisSubscriber { /** * 处理喜欢关注,移除关注 * @param event */ @Subscribe @AllowConcurrentEvents//线程安全 public void followInteractionHandel(FollowEvent event){ if(event!=null){ System.out.println("用户关注了你");//接送消息系统 } } }
异步使用订阅事件:
@Override public ResultMapper addUserFollow(Long lookerId, Long hosterId) { ResultMapper result = new ResultMapper(); if(lookerId==null||hosterId==null||hosterId==0l||lookerId==0l){ result.setCode(ResultMap.FAILURE.getCode()); result.setMsg(ResultMap.FAILURE.getDesc()); }if(lookerId != null && hosterId != null&&lookerId.equals(hosterId)){ result.setCode(ResultMap.REPETOPER.getCode()); result.setMsg(ResultMap.REPETOPER.getDesc()); return result; } try{ boolean flag = userFollowDAO.addFollow(lookerId, hosterId); if(!flag){ result.setCode(ResultMap.SUCCESS.getCode()); result.setMsg(ResultMap.SUCCESS.getDesc()); } FollowEvent event = new FollowEvent(lookerId,FriendEventType.ADD_FOLLOW_FRIEND,hosterId); eventBus.post(event);//异步处理关联订阅事件 }catch(Exception e){ logger.error("userfeed addFollower error,Long userId={}, Long follerId={}", lookerId, hosterId, e); result.setCode(ResultMap.FAILURE.getCode()); result.setMsg(ResultMap.FAILURE.getDesc()); } return result; }