先安装apache-activemq-5.11.1 ,然后启动activemq.bat
启动activeMQ的控制台:http://localhost:8161/admin/,展示图如下:
解释:Number Of Pending Messages :等待消费的消息,这个是当前未出队列的数量;Number Of Consumers :表示消费者数量;Messages Enqueued :进入队列的消息;( 这个数量只增不减,重启acmq后会清零);Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量 (重启acmq后会清零)。
二,项目原码的解析
1.消息消费者:
mail.properties
mail.host=smtp.qq.com mail.port=25 [email protected](消息发送方) mail.password=邮箱的授权码 mail.debug=true mail.smtp.auth=true mail.smtp.ssl.enable=true [email protected](消息发送方,这个要与上面的要保持一直,不然会出错)
注:邮箱的授权码的获取:邮箱授权码的获取方法
spring-mq.xml
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName}"></property> </bean> <!--这个是sessionAwareQueue目的地 --> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>${queueName}</value> </constructor-arg> </bean> <!-- 可以获取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="wusc.edu.demo.mqtest.listener.ConsumerSessionAwareMessageListener"></bean> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="sessionAwareQueue" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean>
spring-mail.xml:
<!-- Spring提供的发送电子邮件的高级抽象类 --> <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl"> <property name="host" value="${mail.host}" /> <property name="username" value="${mail.username}" /> <property name="password" value="${mail.password}" /> <property name="defaultEncoding" value="UTF-8"></property> <property name="javaMailProperties"> <props> <prop key="mail.smtp.auth">${mail.smtp.auth}</prop> <!-- Debug模式 --> <prop key="mail.debug">${mail.debug}</prop> <!-- 使用SSL --> <prop key="mail.smtp.ssl.enable">${mail.smtp.ssl.enable}</prop> <!-- 配置MailSSLSocketFactory --> <prop key="mail.smtp.ssl.socketFactory">mailSSLSocketFactory</prop> </props> </property> </bean> <bean id="simpleMailMessage" class="org.springframework.mail.SimpleMailMessage"> <property name="from"> <value>${mail.default.from}</value> </property> </bean> <!-- 配置线程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线程池维护线程的最少数量 --> <property name="corePoolSize" value="5" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="30000" /> <!-- 线程池维护线程的最大数量 --> <property name="maxPoolSize" value="50" /> <!-- 线程池所使用的缓冲队列 --> <property name="queueCapacity" value="100" /> </bean>
邮件业务处理类
@Component("mailBiz") public class MailBiz { @Autowired private JavaMailSender mailSender;// spring配置中定义 @Autowired private SimpleMailMessage simpleMailMessage;// spring配置中定义 @Autowired private ThreadPoolTaskExecutor threadPool; /** * 发送模板邮件 * * @param mailParamTemp需要设置四个参数 * templateName,toMail,subject,mapModel * @throws Exception * */ public void mailSend(final MailParam mailParam) { threadPool.execute(new Runnable() { public void run() { try { simpleMailMessage.setFrom(simpleMailMessage.getFrom()); // 发送人,从配置文件中取得 simpleMailMessage.setTo(mailParam.getTo()); // 接收人 simpleMailMessage.setSubject(mailParam.getSubject()); simpleMailMessage.setText(mailParam.getContent()); mailSender.send(simpleMailMessage); } catch (MailException e) { throw e; } } }); } }
队列监听器 .ConsumerSessionAwareMessageListener
@Component public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<Message> { private static final Log log = LogFactory.getLog(ConsumerSessionAwareMessageListener.class); @Autowired private JmsTemplate activeMqJmsTemplate; @Autowired private Destination sessionAwareQueue; @Autowired private MailBiz bailBiz; public synchronized void onMessage(Message message, Session session) { try { ActiveMQTextMessage msg = (ActiveMQTextMessage) message; final String ms = msg.getText(); log.info("==>receive message:" + ms); MailParam mailParam = JSONObject.parseObject(ms, MailParam.class);// 转换成相应的对象 if (mailParam == null) { return; } try { bailBiz.mailSend(mailParam); } catch (Exception e) { // 发送异常,重新放回队列 // activeMqJmsTemplate.send(sessionAwareQueue, new MessageCreator() { // public Message createMessage(Session session) throws JMSException { // return session.createTextMessage(ms); // } // }); log.error("==>MailException:", e); } } catch (Exception e) { log.error("==>", e); } } }
异常的处理逻辑现存问题,会出现死循环,所以注释掉了
邮件参数封装类 (MailParam)
public class MailParam { /** 发件人 **/ private String from; /** 收件人 **/ private String to; /** 主题 **/ private String subject; /** 邮件内容 **/ private String content; public MailParam() { } public MailParam(String to, String subject, String content) { this.to = to; this.subject = subject; this.content = content; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
ActiveMQ测试启动类 .
public class MQConsumer { private static final Log log = LogFactory.getLog(MQConsumer.class); public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml"); context.start(); } catch (Exception e) { log.error("==>MQ context start error:", e); System.exit(0); } } }
项目结构图如下:
②:消息生产者:
mq.properties
## MQ mq.brokerURL=tcp\://127.0.0.1\:61616 mq.userName=admin mq.password=admin mq.pool.maxConnections=10 #queueName queueName=firstTest
spring-mq.xml:
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服务地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> </bean> <!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <!-- 队列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestinationName" value="${queueName}"></property> </bean>
ActiveMQ测试启动类
public class MQProducerTest { private static final Log log = LogFactory.getLog(MQProducerTest.class); public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml"); context.start(); MQProducer mqProducer = (MQProducer) context.getBean("mqProducer"); // 邮件发送 MailParam mail = new MailParam(); mail.setTo("你要接收的邮箱"); mail.setSubject("ActiveMQ测试"); mail.setContent("通过ActiveMQ异步发送邮件!"); mqProducer.sendMessage(mail); context.stop(); } catch (Exception e) { log.error("==>MQ context start error:", e); System.exit(0); } finally { log.info("===>System.exit"); System.exit(0); } } }
项目结构图:
这样就可以了。