我们都知道完成一个模块不可能都在一个方法里面完成所有事情,需要将代码按各个维度拆分细化出来,保证系统的松耦合,比如设计模式中的模板模式就体现了这点, 既然是独立模块,那么这时候就需要通过去调用别的类或接口的方法来完成所要完成的事。
图一:
上图是一个类的方法要调用另外一个类里面叫call的方法,接收返回结果,很简单嘛,因为都在一个进程空间,两个类都是同一个java虚拟机实例并且都帮我们解析好了,只要调用一下就可以。但是假如不在同一个进程空间呢?还能这样来达到我们要的效果吗?很明显不行。
如图二:
因为两个类分别在不同的进程空间,这时候我们不能像之前那样简单调用了,那怎么办呢?很多人可能会想到现在流行的微服务,的确微服务是可以完美的解决这种跨进程服务分别分布在不同应用上面的问题。
在当今互联网当道的环境下,分布式系统已经是被广泛接受和使用,各种分布式架构也是层出不穷,支持各种微服务、SOA架构的技术也应运而生。从最初的Spring Cloud到现在流行的阿里巴巴开源框架Dubbo都明显的说明了分布式系统的重要性。
但我现在想说的不是分布式服务框架,而是消息服务组件——RabbitMQ
通过消息组件方式也可以完成我们上面出现的问题,下面我结合一个例子来说明怎么使用rabbitMq,算是入门吧,哈哈。
直接上代码解决一切问题-_-
导入需要的maven依赖,顺便把spring和junit包也引入,用来跑单元测试
pom.xml
<properties>
<spring.version>3.2.8.RELEASE</spring.version>
<logback.version>1.1.5</logback.version>
<tomcat.version>7.0.81</tomcat.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.24</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-servlet-api</artifactId>
<version>${tomcat.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
</dependencies>
因为是用spring集成,引入rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory" username="test" password="test" host="192.168.xxx.xxx" port="5672"/>
<rabbit:template id="programTemplate" connection-factory="connectionFactory" exchange="program.direct.exchange"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="program.erland.queue" durable="true" auto-delete="false" exclusive="false">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">60000</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="program.java.queue" durable="true" auto-delete="false" exclusive="false">
</rabbit:queue>
<rabbit:direct-exchange name="program.direct.exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="program.erland.queue" key="erland"></rabbit:binding>
<rabbit:binding queue="program.java.queue" key="java"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="javaListener" class="rabbitmq.spring.listener.JavaListener"></bean>
<bean id="erlandListener" class="rabbitmq.spring.listener.ErlandListener"></bean>
<bean id="javaProducer" class="rabbitmq.spring.producer.JavaProducer"></bean>
<bean id="erlandProducer" class="rabbitmq.spring.producer.ErlandProducer"></bean>
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="manual"
concurrency="20"
max-concurrency="50"
requeue-rejected="false">
<rabbit:listener ref="javaListener" queue-names="program.java.queue"/>
<rabbit:listener ref="erlandListener" queue-names="program.erland.queue"/>
</rabbit:listener-container>
</beans>
applicationContext-test.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<!--<context:component-scan base-package="rabbitmq.spring"></context:component-scan>-->
<import resource="classpath*:rabbit-mq.xml"/>
</beans>
生产者
Producer
public abstract class Producer {
@Resource(name = "programTemplate")
AmqpTemplate amqpTemplate;
protected abstract String routeKey();
public void produce(Programmer log) {
try {
Message message = MessageBuilder.withBody(JSON.toJSONString(log).getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setMessageId(UUID.randomUUID().toString())
.build();
amqpTemplate.convertAndSend(routeKey(), message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
JavaProducer
@Component
public class JavaProducer extends Producer {
@Override
protected String routeKey() {
return "java";
}
}
UML图
还差监听,一鼓作气搞定它
ProgramListener
public abstract class ProgramListener implements ChannelAwareMessageListener {
final Logger logger = LoggerFactory.getLogger(this.getClass());
public void onMessage(Message message, Channel channel) throws Exception {
try {
logger.info("Receive message, message body -> {}", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
logger.error("exception", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
JavaListener
public class JavaListener extends ProgramListener {
}
到这一步就已经完成了rabbitmq的配置和监听了,配置是采用了了rabbitmq中的direct模式
写个单元测试来验证下
RabbitMqMain
package com;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import rabbitmq.spring.entity.Programmer;
import rabbitmq.spring.producer.JavaProducer;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "/applicationContext-test.xml")
//@TestExecutionListeners({/*ServletTestExecutionListener.class, */DependencyInjectionTestExecutionListener.class/*,
// DirtiesContextTestExecutionListener.class*/})
public class RabbitMqMain {
@Autowired
JavaProducer javaProducer;
@Test
public void testSend() {
Programmer programmer = new Programmer();
programmer.setMessage("i am java, do u know me?");
javaProducer.produce(programmer);
}
}
可以正常跑起来。
附上rabbitmq控制台信息: