新建发送者Send.java
package com.luo.rabbit.test.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
//队列名称
private final static String QUEUE_NAME = "queue";
public static void main(String[] argv) throws java.io.IOException
{
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "hello world!";
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Sent '" + message + "'");
//关闭频道和连接
channel.close();
connection.close();
}
}
新建接收者Recv.java
package com.luo.rabbit.test.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
//队列名称
private final static String QUEUE_NAME = "queue";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'");
}
}
}
例子一可能通俗易懂,但是并不是很规范,而且有些缺陷,比如我要发送一个对象过去呢?下面看另外一个例子:
首先建一个连接类,因为发送者和接收者的连接代码都是一样的,之后让二者继承这个连接类即可。
连接类代码BaseConnector.java:
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class BaseConnector {
protected Channel channel;
protected Connection connection;
protected String queueName;
public BaseConnector(String queueName) throws IOException{
this.queueName = queueName;
//打开连接和创建频道
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名 127.0.0.1即localhost
factory.setHost("127.0.0.1");
//创建连接
connection = factory.newConnection();
//创建频道
channel = connection.createChannel();
//声明创建队列
channel.queueDeclare(queueName, false, false, false, null);
}
}
发送者Sender.java:
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
public class Sender extends BaseConnector {
public Sender(String queueName) throws IOException {
super(queueName);
}
public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",queueName, null, SerializationUtils.serialize(object));
}
}
接收者代码Receiver.java:
import java.io.IOException;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ShutdownSignalException;
public class Receiver extends BaseConnector implements Runnable, Consumer {
public Receiver(String queueName) throws IOException {
super(queueName);
}
//实现Runnable的run方法
public void run() {
try {
channel.basicConsume(queueName, true,this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 下面这些方法都是实现Consumer接口的
**/
//当消费者注册完成自动调用
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}
//当消费者接收到消息会自动调用
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
MessageInfo messageInfo = (MessageInfo)SerializationUtils.deserialize(body);
System.out.println("Message ( "
+ "channel : " + messageInfo.getChannel()
+ " , content : " + messageInfo.getContent()
+ " ) received.");
}
//下面这些方法可以暂时不用理会
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
}
---------------------