一直听说MQ这个东西,也一直没有去尝试下,刚好部门开会又提到了这个东西,这几天闲点,抽个空体验下。
分享下自己的学习经验吧:
1.纠结的搭建过程
2.具体组件以及相关组件说明
3.应用场景和代码实现
1.纠结的搭建过程
说起搭建过程,真是过程曲折啊,暂时还没有搞好,所以就先使用测试环境的MQ了。
a)RabbitMQ依赖erlang的包,erlang的官网包很难下,基本上点击进去就访问不了,最后终于知乎上看到一个可用连接了,分享下:http://erlang-users.jp/ 日本的一个镜像,虽然是日语翻译一下将就看吧,基本可以下载到erlang的源码包。
b)便宜erlang又报了个"No curses library functions found" 我用一下yum命令发现虚拟机连不上公网,改了网络模式也没有生效,本地先不搭了吧,先测试环境将就着用吧。
yum list|grep ncurses yum -y install ncurses-devel yum install ncurses-devel
2.具体组件以及相关组件说明
a) exchange
exchange有以下几种模式:
default 默认的exchange,所有的队列都会注册该exchange,exchangeName为空,routingKey为队列名称。
direct模式,指定exchangeName和routingKey即可绑定指定的队列。
fanout模式,广播模式,忽略routingKey属性,向所有绑定该exchange的队列发送消息。
topic模式,感觉和direct类似,只是支持多维度的routingKey和* #匹配。
header模式,根据消息头的部分属性来绑定消息队列。
b) queue
Queue有以下几个属性:
Durable(消息代理重启后,队列依旧存在)
Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
Auto-delete(当最后一个消费者退订后即被删除)
Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)
c) Consumer
消息确认模式:消费者任务做完,或者收到消息发送ack。
拒绝消息:消费者发现消息过期或者消息处理失败,同时告诉MQ这条消息该如何处理,重新放入队列还是丢弃。
预取消息:一个消费者可以一次从队列中获取多条消息。
d) Channel
一个连接多个通道,每个通道有自己的chanelID,主要用于连接复用。类似NIO中的channel。
3.应用场景和代码实现
应用场景主要参考官网的入门文档:https://www.rabbitmq.com/getstarted.html
看完应用场景觉得这个文档特别贴心,专门为小白用户写的,由浅入深,逐步深入,点赞。
下面说说我对这几个场景的理解吧,也便于自己梳理,新手的话还是建议参考官网文档。
a) Hello World场景:
这是最简单的场景,生产者丢消息到队列中,消费中从队列中取消息。
Send.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; // 发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
Consumer.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; // 测试发现这是个阻塞任务,处理完消息就会等待队列的下一条消息 channel.basicConsume(QUEUE_NAME, true, consumer); }
b) Work Queue模式
队列主要存放比较耗费资源(时间,内存,CPU)的任务,生产者提交任务到队列,多个消费者一起来完成任务。
Send.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 第二个参数标记是否持久化队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = getMessage(args); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } /** * 生产消息 * @param strings * @return */ public static String getMessage(String[] strings){ /** * 随机数用于测试同一条消息只被一个消费者处理还是多个消费者同时处理 * 测试证明,如果是这种策略的话只会被一个消费者接受和处理 */ if (strings.length < 1) return "Hello World!..." + (int)(Math.random()*100); return joinStrings(strings, " "); } public static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
Consumer.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { // 模拟耗时任务 if (ch == '.') Thread.sleep(1000); } }
c) 发布订阅模式
终于进入了Rabbit MQ的核心组件Exchange,官方给出的例子是日志系统,生成的日志写入两个队列,一个队列的消费者用于打印在控制台,另一个队列的消费者写入日志文件。这个主要使用的是fanout模式,意思是该exchange的消息会同步分发给所有绑定它的队列。和上面两种模式很明显的区别是:上面两种模式一条消息只能被一个消费者接收,而这里一条消息给多个队列从而给多个消费者使用。
Send.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(args); // 发布消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
Consumer.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 生成名字随机的队列,绑定exchange来接受信息 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
d) Routing模式
这个说一个明显的需求就能看出和上面的区别。一个消费者需要将error级别的日志打印到文件,error和别的级别都打印到控制台。这里使用的是direct模式,只需要指定哪个exchange,使用哪个routingkey即可根据该规则过滤到指定的队列。
Send.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(args); String message = Send.getMessage(args); // 主要关注这里的前两个参数一个是exchangeName 一个是routingKey channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } private static String getSeverity(String[] strings) { if ((int)(Math.random()*10) > 5 ) return "info"; return "error"; }
Consumer.java
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 生成系统指定名称的队列,方便回收 String queueName = channel.queueDeclare().getQueue(); /** * 这里启动多个Receive进行测试 * 一个是args = new String[1];args[0] = "error"; * 一个是args = new String[2];args[0] = "error";args[1] = "info"; */ args = new String[1]; args[0] = "error"; for(String severity : args){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); }
e) Topics模式
其实只要看需求就能理解Topics模式,跟上面的需求对比下,如果我需要过滤定时任务的错误日志,我不想要系统内核的错误日志,怎么办呢?上面的direct只能解决一维的问题,但是topics可以解决多维的问题。同时这里的这两句需要关注下。”* (star) can substitute for exactly one word.“ ”# (hash) can substitute for zero or more words.“ 这里就不贴代码,跟上面的类似,只需要注意两个参数routingKey 和 the exchange type。
f) RPC模式
主要是RPC调用请求发送到exchange,exchange绑定到相应的请求队列,这里的消费者即服务端,服务端处理完成后,同时将结果封装到Response队列中,客户端通过判断是否是自己的correlationId,如果是结束调用,或者下一次调用。
Client.java
public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); connection = factory.newConnection(); channel = connection.createChannel(); // 请求相应的消息队列 replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); // 封装请求的属性 BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // 判断和自己的请求ID是否相同 if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (Exception ignore) {} } } }
RPCServer.java
private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setPort(QUEUE_PORT); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_REQUEST_QUEUE_NAME, false, false, false, null); // 设置每个消费这最多处理一个任务 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_REQUEST_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; // Main application-side API: wait for the next message delivery and return it. QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties requestProps = delivery.getProperties(); // 相应只要指定请求ID即可 BasicProperties replyProps = new BasicProperties .Builder() .correlationId(requestProps.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(),"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e){ System.out.println(" [.] " + e.toString()); response = ""; } finally { // 写入response队列 channel.basicPublish( "", requestProps.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } }