rabbitMQ 测试例子

package com.sun.rabbitMQ;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) 
                      throws java.io.IOException, TimeoutException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    //指定队列持久化
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = "我又出发了";

    //指定消息持久化
    channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
}
package com.sun.rabbitMQ;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

   private final static String QUEUE_NAME = "hello";

   public static void main(String[] argv)
         throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
      /* 这里怎么打开连接和信道,以及声明用于接收消息的队列,这些步骤与发送端基本上是一样的 */
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      /* 确保这里的队列是存在的 */
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      /*
       * 这里用到了额外的类QueueingConsumer来缓存服务器将要推过来的消息。我们通知服务器向接收端推送消息,
       * 然后服务器将会向客户端异步推送消息,这里提供了一个可以回调的对象来缓存消息,直到我们做好准备来使用 它,这个类就是QueueingConsumer
       */
      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(QUEUE_NAME, true, consumer);

      while (true) {
         QueueingConsumer.Delivery delivery = consumer.nextDelivery();
         String message = new String(delivery.getBody());
         System.out.println(" [x] Received '" + message + "'");
      }
   }
}
package com.sun.rabbitMQ;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws java.io.IOException, TimeoutException {
    /*连接可以抽象为socket连接,为我们维护协议版本信息和协议证书等。这里我们连接
    上了本机的消息服务器实体(localhost)。如果我们想连接其它主机上的RabbitMQ服务,只需要修改一下主机名或是IP就可以了*/
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    //创建一个已经生成名字的、排他性的且会自动删除的Queue
    System.out.println(channel.queueDeclare().getQueue());
    /*接下创建channel(信道),这是绝大多数API都能用到的。为了发送消息,你必须要声明一个消息消息队列,然后向该队列里推送消息*/
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "这是第四条数据!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent11111 '" + message + "'");
    
    /*声明一个幂等的队列(只有在该队列不存在时,才会被创建)。消息的上下文是一个
      字节数组,你可以指定它的编码。*/
    channel.close();
    connection.close();
  }
}
package com.sun.rabbitMQ;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException, TimeoutException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    //指定队列持久化
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    //指定该消费者同时只接收一条消息
    channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

//打开消息应答机制
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
//      doWork(message); 
      System.out.println(" [x] Done" );
    
      //返回接收到消息的确认信息
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
}

猜你喜欢

转载自blog.csdn.net/weixin_41825468/article/details/90771288