安装和启动好了RabbitMQ后,我们下面来通过编写一个Java程序来学习第一个队列知识。
第一篇我们提到过,RabbitMQ一共有六种队列模式:
分别是“HelloWorld”简单队列模式、“Work queues”工作队列模式、“Publish/Subscribe”发布/订阅模式、“Routing”路由模式、“Topics”通配符模式、“RPC”远程服务调用。
那么我们首先从最基本的“HelloWorld”简单队列讲解。关于“HelloWorld”简单队列的队列模式图如下所示:
其中“P”代表消息的生产者(Producter),由它来产生消息。而中间红色的框就是队列,即是生产者产生消息之后,将消息发送到队列。后面的“C”代表消息的消费者(consumer),即是将生产者发送至消息队列中的消息进行消费(取出)。
这就是最简单的一个队列模式。
我们下面使用Java连接RabbitMQ,并实现简单队列。
首先创建一个maven工程:
上面我们的账号信息使用了一开始创建的jack账号的信息。
然后创建消息的生产者和消费者的模拟类。新建名为“Send”的类,作为生产者:
然后使用channel对象的“queueDeclare”方法声明(创建)了一个“队列”,然后在参数中指定队列的名称(这里指定为“test_queue”)。
然后创建一个消息,通过channel对象的“basicPublish”方法将消息发送至“test_queue”队列。
最后关闭通道和连接。
运行该生产者:
然后我们可以在RabbitMQ的管理工具的Queues模块中查看队列信息:
点击队列名称,然后选择“Get messages”选择,点击按钮,可以查看队列中信息的详细内容:
图4.10.png
然后创建一个名为“Recv”的类,作为消息消费者:
然后定义队列的消费者QueueingConsumer,然后使用“basicConsume”来监听队列。后面我们写了一个死循环,用于一致监听队列并获取消息。在循环块中使用consumer对象的nextDelivery()方法来进行消息的接收和消费。
这里要注意的是,在该模式下,消费者接收的消息一旦被消费,则队列中就不再有此消息(相当于“阅后即焚”)。在运行消费者之前,观察队列的情况:
运行消费者之后,在控制台可以看到消费者接收到的信息:
并且在管理工具中可以看到队列中的消息已经被消费而不存在:
第一篇我们提到过,RabbitMQ一共有六种队列模式:
分别是“HelloWorld”简单队列模式、“Work queues”工作队列模式、“Publish/Subscribe”发布/订阅模式、“Routing”路由模式、“Topics”通配符模式、“RPC”远程服务调用。
那么我们首先从最基本的“HelloWorld”简单队列讲解。关于“HelloWorld”简单队列的队列模式图如下所示:
其中“P”代表消息的生产者(Producter),由它来产生消息。而中间红色的框就是队列,即是生产者产生消息之后,将消息发送到队列。后面的“C”代表消息的消费者(consumer),即是将生产者发送至消息队列中的消息进行消费(取出)。
这就是最简单的一个队列模式。
我们下面使用Java连接RabbitMQ,并实现简单队列。
首先创建一个maven工程:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jack.rabbitmq</groupId> <artifactId>RabbitMQ_Test_project</artifactId> <version>0.0.1-SNAPSHOT</version> <build/> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency> </dependencies> </project>然后在src/mian/java中创建一个连接工厂ConnectionUtil,用于连接RabbitMQ:
package cn.jack.rabbitmq.connection; import java.io.IOException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws IOException{ //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //定义连接地址 factory.setHost("localHost"); //定义端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/jack"); factory.setUsername("jack"); factory.setPassword("jack"); // 通过工厂获取连接 Connection connection = factory.newConnection(); return connection; } }在该类中首先定义了一个连接工厂“ConnectionFactory”,然后设置其服务地址、端口号、账号信息(用户名、密码、vhost),最后通过连接工厂获取一个RabbitMQ的连接对象。
上面我们的账号信息使用了一开始创建的jack账号的信息。
然后创建消息的生产者和消费者的模拟类。新建名为“Send”的类,作为生产者:
package cn.jack.rabbitmq.simple; import java.io.IOException; import cn.jack.rabbitmq.connection.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME="test_queue"; public static void main(String[] args) throws IOException { //获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //从连接中创建通道 Channel channel = connection.createChannel(); //声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //消息内容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[product] Send '"+ message +"'"); //关闭通道和连接 channel.close(); connection.close(); } }该类首先通过连接工具类获取到了RabbitMQ的连接对象,类似JDBC连接数据库的连接对象。然后通过连接对象获取Channel通道,这个就相当于获取JDBC的Statement对象。通过连接对象创建Channel对象,相当于创建了一个与RabbitMQ的“通道”,通过“通道”可以做一系列与RabbitMQ交互的操作。
然后使用channel对象的“queueDeclare”方法声明(创建)了一个“队列”,然后在参数中指定队列的名称(这里指定为“test_queue”)。
然后创建一个消息,通过channel对象的“basicPublish”方法将消息发送至“test_queue”队列。
最后关闭通道和连接。
运行该生产者:
然后我们可以在RabbitMQ的管理工具的Queues模块中查看队列信息:
点击队列名称,然后选择“Get messages”选择,点击按钮,可以查看队列中信息的详细内容:
图4.10.png
然后创建一个名为“Recv”的类,作为消息消费者:
package cn.jack.rabbitmq.simple; import cn.jack.rabbitmq.connection.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws Exception { //获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //监听队列 channel.basicConsume(QUEUE_NAME, true,consumer); //获取消息 while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[Consumer] Received '"+ message +"'"); } } }与生产者一样,首先获取连接对象和创建channel通道对象。我们这个消费者消费的是生产者的队列,上面的生产者已经创建好了改队列,该理说不需要再创建队列,但是为了防止消费者消费的队列不存在这种异常的发送,还是同样声明生产者一样的队列,以防万一(如果已存在,该次创建无效,不会影响已存在的相同的队列)。如果开发者明确改队列百分之百存在,则可以在消费者代码中忽略声明队列这一步。
然后定义队列的消费者QueueingConsumer,然后使用“basicConsume”来监听队列。后面我们写了一个死循环,用于一致监听队列并获取消息。在循环块中使用consumer对象的nextDelivery()方法来进行消息的接收和消费。
这里要注意的是,在该模式下,消费者接收的消息一旦被消费,则队列中就不再有此消息(相当于“阅后即焚”)。在运行消费者之前,观察队列的情况:
运行消费者之后,在控制台可以看到消费者接收到的信息:
并且在管理工具中可以看到队列中的消息已经被消费而不存在:
以上就是RabbitMQ中最简单的消息队列模式,即是生产者生产消息并发送至消息队列,消费者监听队列并从队列中获取消息。
转载请注明出处:http://blog.csdn.net/acmman/article/details/79438196