最简单的案例:
[email protected]:n_zhe/rabbitMQ-demo.git
通过简单的例子分析mq是怎样发送和拉取消息的:
public Connection newConnection() throws IOException, TimeoutException { return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); }
最终调用:
参数executor默认为null,addrs为mq的地址+端口,clientProvideName为null
继续观察:
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
其实这个FrameHandlerFactory就是对SocketFactory进行了一次封装:
最终返回的为SocketFrameHandler类,该类是对Socket进行了封装:
继续向下走,会使用之前的返回的FrameHander对象new一个AMQConnection对象:
然后会调用AMQConnection的start()方法:
下边这个for循环,第一次启动后就直接return??
③返回AMQConnection对象
通过Connectin来创建Channel,返回ChannelN对象
④通过ChannelN对象的basicPublish方法发布消息
先通过exchange,routingKey等参数构建一个Publish,进而构建一个AMQCommand对象
最终:
流程:
消费端:
开始也是获取ConnectionFactory、Connection、Channel,然后通过Channel来操作(不管是生产端还是消费端,mq都是通过channel来进行操作的)
①消费端会声明一个消费队列
// 5、常见一个消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
②通过Channel将消费这和消息队列关联
队列消费者,用于监听队列中的消息。调用nextDelivery方法时,内部实现就是调用队列的take方法。该方法的作用:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。说白了就是如果没有消息,就处于阻塞状态。
运行结果:
消费端:
等待获取消息======
生产端:
已经发送消息了
当生产端发送消息之后,消费端输出:
等待获取消息====== 消费端hello rabbit-mq 消费端hello rabbit-mq 消费端hello rabbit-mq 消费端hello rabbit-mq 消费端hello rabbit-mq
以上代码没有指定exchange,因此rabbitmq server会自动通过默认的exchange(即default exchange)取转发消息,如果生产者的routingkey和消费端的队列名称相同的话,则能够转发成功,否则失败