一 .概述
我们不从开始就讲述基本的概念,尤其是在Rabbitmq之中有些概念确实比较难以理解,我们首先做的就是将光放提供的消息模型
进行实现,然后再总结一下Rabbitmq之中的基本概念.
二 .基础的工具类
我们想使用Rabbitmq,那么首先需要的就是一个连接,本部分我们首先就说一下获取连接的方式.
我们首先搭建我们的测试环境,首先需要做的就是添加我们的依赖包.
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
然后我们需要做的就是编写我们的连接工具类.
通过下面的代码,我们知道我们这个仅仅只是一个我们测试环境下的一个工具类,很多东西都是方便我们使用的而已.
public class ConnectionUtils {
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
return factory.newConnection();
}
}
在Rabbimtq之中,获取连接的方式时比较容易的.主要分成下面的三个部分:
[1]创建连接工厂
[2]为连接工厂设置属性
[3]从连接工厂之中获取连接.
三,简单队列
上面的图中表示的就是简单队列的模型.
其中:P表示的就是消息的生产者,红色的部分表示的就是消息队列,C表示的就是消息的消费者.
上面的模型是十分的简单的,这也是我们首次使用Rabbitmq的最好的一个模型.
看下面最简单的代码:
public class Send {
public static void main(String[] args) throws Exception{
// 获取一个连接对象
Connection connection = ConnectionUtils.getConnection();
//从连接之中获取一个Channel对象
Channel channel = connection.createChannel();
// 队列的名称
String queueName = "simple_queue";
// 声明一个队列
channel.queueDeclare(queueName, false, false, false, null);
// 然后向队列之中发送消息
channel.basicPublish("", queueName, null, "trek".getBytes());
//关闭资源
channel.close();
connection.close();
}
}
当我们发送了一条消息之后,我们到admin之中看看情况.
我们从admin之中看到我们确实成功的发送了一条消息.
然后下面我们需要编写我们的消费端的内容.
public class Resv {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 创建一个消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消费者获取到的消息是=="+new String(body));
}
};
channel.basicConsume("simple_queue",true, consumer); // 注意这个参数true,如果我们设置为false,我们就需要手动确认消息的消费,否则这一条消息无法在中间件之中清除掉.
}
}
现在我们看看我们的admin之中的内容,我们发现现在在admin之中的内容如下:
从生产者之中发送的消息确实被消费掉了.