public class Product {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("10.211.55.3");
connectionFactory.setPort(5672);
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
connectionFactory.setVirtualHost("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//1.队列名(如果没有这个队列,会自动创建)
//2.是否要进行队列的持久化
//3.是否是独占队列,只有相同的用户才能消费
//4.在消费后是否自动删除队列
//5.其他的一些参数
channel.queueDeclare("are you ok",true,false,false,null);
//1.交换机
//2.队列名
//3.其他的一些参数
//4.传输的内容
channel.basicPublish("","are you ok",null,"我想睡觉".getBytes());
channel.close();
connection.close();
}
}
执行完生产者后,会生成一个队列,并放入一条我想睡觉的消息。
此页面默认五秒刷新一次,可以右上角进行设置。
然后我们使用消费者来进行消费
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("10.211.55.3");
connectionFactory.setPort(5672);
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
connectionFactory.setVirtualHost("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//1.队列名(如果没有这个队列,会自动创建)
//2.是否要进行队列的持久化
//3.是否是独占队列,只有相同的用户才能消费
//4.在消费后是否自动删除队列
//5.其他的一些参数
channel.queueDeclare("are you ok",true,false,false,null);
channel.basicConsume("are you ok",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new java.lang.String(body));
}
});
}
打印出来的数据代表消费成功。此时的生产者channel管道并没有被关闭,所以会一直监听队列中的消息。