RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
该文章所举的例子实际运用中不适合,但是拿来入门就十分合适了。其中例一因为现在mqp-client 在 5.X以后因为性能原因去掉了QueueingConsumer,所以如果用的比较新的jar包(4.x之后),是不支持QueueingConsumer,所以换用其官方推荐的DefaultConsumer。故将修改后的代码呈上:
生产者:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**发送者
* @author author:Beat IT
* @version time:2019-8-21 上午11:18:34
*
*
*/
public class Send {
//队列名称
private final static String QUEUE_NAME = "queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ所在主机IP或主机名
factory.setHost("127.0.0.1");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//指定需要发送的消息
//往队列中发出消息
final String queueName = "test01";
String msg = "Hello RabbitMQ";
for (int i = 0; i < 6; i++) {
channel.basicPublish("", queueName,null, (msg + i).getBytes());
}
System.out.println("sended something!");
//关闭频道和链接
channel.close();
connection.close();
}
}
消费者:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 接收者
*
* @author author:Beat IT
* @version time:2019-8-21 上午11:30:19
*
*
*/
public class Rec {
// 队列名称
private final static String QUEUE_NAME = "queue";
public static void main(String[] args) throws IOException, TimeoutException {
//与发送端一样,需要打开连接和创建频道
//创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ所在主机IP或主机名
factory.setHost("127.0.0.1");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//申明一个队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("Waiting for messages. To exit press CTRL+C");
//创建队列消费者
channel.basicConsume(QUEUE_NAME, true,new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Received:" + message);
}
}) ;
}
}