RabbitMq,先随便写了例子,假如想用在项目中,单例模式即可:
public class MQConstant { public static final String EXCHANGE_NAME = "liuExchange"; public static final String QUEUE_NAME = "liuQueue"; public static final String RoutingKey = "liuRoutingKey"; }
public class Produce { private static final Logger log = LoggerFactory.getLogger(Produce.class); public static void main(String[] args) throws IOException, TimeoutException { Produce p = new Produce(); p.doSend(); } private void doSend() throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { // 获取连接通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); // 声明队列(非必要代码),注意这里的配置要和已建立的交换器一致,不然会报错。durable-持久化 // channel.queueDeclare(MQConstant.QUEUE_NAME, true, false, false, null); String message = "Hello World!"; channel.basicPublish(MQConstant.EXCHANGE_NAME, MQConstant.RoutingKey, null, message.getBytes("UTF-8")); System.out.println("[Produce] Sent:'" + message + "'"); } catch (Exception e) { log.error("发送出错:", e); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
public class Consumer { private static final Logger log = LoggerFactory.getLogger(Produce.class); public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(MQConstant.QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[Consumer] Received:'" + message + "'"); } }; // 自动回复队列应答,不然一个消息可以被重复消费 String basicConsume = channel.basicConsume(MQConstant.QUEUE_NAME, true, consumer); System.out.println(basicConsume); Thread.sleep(1000L); } catch (Exception e) { log.error("发送出错:", e); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
个人理解方面:
1、通讯前会建立一次连接,拿到connection对象,可重复使用,也可立即关闭。使用时connection.createChannel()即可,不需重复使用时,用完记得关闭Channel。
2、生产者短连接,Channel用完即关闭。
3、消费者长连接,Channel处于监听,生产者用完Channel即可关闭。
4、可在容器启动时,消费者注册到监听器。