package com.sun.rabbitMQ; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //指定队列持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = "我又出发了"; //指定消息持久化 channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
package com.sun.rabbitMQ; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException, TimeoutException { /* 这里怎么打开连接和信道,以及声明用于接收消息的队列,这些步骤与发送端基本上是一样的 */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* 确保这里的队列是存在的 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /* * 这里用到了额外的类QueueingConsumer来缓存服务器将要推过来的消息。我们通知服务器向接收端推送消息, * 然后服务器将会向客户端异步推送消息,这里提供了一个可以回调的对象来缓存消息,直到我们做好准备来使用 它,这个类就是QueueingConsumer */ QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
package com.sun.rabbitMQ; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, TimeoutException { /*连接可以抽象为socket连接,为我们维护协议版本信息和协议证书等。这里我们连接 上了本机的消息服务器实体(localhost)。如果我们想连接其它主机上的RabbitMQ服务,只需要修改一下主机名或是IP就可以了*/ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建一个已经生成名字的、排他性的且会自动删除的Queue System.out.println(channel.queueDeclare().getQueue()); /*接下创建channel(信道),这是绝大多数API都能用到的。为了发送消息,你必须要声明一个消息消息队列,然后向该队列里推送消息*/ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "这是第四条数据!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent11111 '" + message + "'"); /*声明一个幂等的队列(只有在该队列不存在时,才会被创建)。消息的上下文是一个 字节数组,你可以指定它的编码。*/ channel.close(); connection.close(); } }
package com.sun.rabbitMQ; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //指定队列持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //指定该消费者同时只接收一条消息 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); //打开消息应答机制 channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // doWork(message); System.out.println(" [x] Done" ); //返回接收到消息的确认信息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }