本章内容
- 解耦思维
- 实例解析
- 实现RPC功能
在项目开发中,我们谈论最多的应该就是项目的可扩展性,解耦项目中的各模块就是解决扩展性的一种实现方式。为什么要解耦?什么样的场景需要解耦呢?
解耦思维
项目开发使用的技术就好像是一个项目的发动机一样,主要是为了业务提供充足的动力。随时间的推移,公司业务会不断的发生变化。就需要技术进行支持。所以就要求在项目的开发过程中,预留一定的扩展性,而要扩展就需要按某种规则拆分项目为小的模块,而模块与模块之间的耦合性越低,相互影响就越小,扩展时系统稳定性就越好。同时,也利于日常的维护。至于拆分的粒度也要充分讨论,如果拆分的太细,在之后需要开发新需求时就需要更多的组共同进行协作,沟通成本会很大。如果太粗又达不到解耦的目的。应用程序和存储之间直接耦合通常就是导致程序被淘汰的主要原因。这也正是MQ能够帮我们解决的问题所在。
面向消息的设计主要是考虑如何把一个比较耗时的程序移出,单独处理而使主程序继续它的工作。本质就是异步思维考虑解耦请求和具体的操作(与生产者和消息者思想相同),很像(AIO的做法)。这也比较符合现实世界生活,例如:我们去餐厅单餐时,前台接待我们后把订单交给后厨,之后又接收其它客户,而我们在点餐完成后也没有一直在那里等待而是玩着手机等待叫号。
面向消息的程序还有一个特点就是 你关心的是完成任务,但并不是实时完成的,无须应答请求。而当我们完成任务的动作无法跟上请求的速度时,我们还可以利用自动轮询的模式,把MQ充当负载均衡器来使用,使得程序有更好的扩展性。
发后即忘模型
匹配这种模型的几种一般类型的任务:
- 批处理,针对大型数据集合的工作。这种类型的工作一般可以构建为单一的任务请求,或者多个任务对数据集合的独立部分进行操作。
- 通知,对发生事情的描述。其内容可以是某些日志,或者是一个报告需要发送给另一个或多个程序。
- 并行处理,对相互独立的程序有统一的触发点。例如:客户下单后(触发点),给客户添加积分、通知仓库准备发货、通知财务开相应的开票等等 这些操作并没有明显的串行关系。
实例解析
实例一
星期天,你在家躺在沙发上嗑着瓜子、看着电视(也有可能是你老婆,哈哈)。中午到了,你命令你媳妇做饭,糖醋排骨,你媳妇就去忙活了。5分钟后你又想吃鸡了,于是又命令你媳妇再做一个土豆烧鸡。
特点:你自己就可以看到是一个生产者(命令的发出者),而你媳妇就是一个消费者(命令的接收执行者)。你与你媳妇之间只有一个队列通道,接收到命令后也是按顺序做的饭,所以可以使用direct类型的交换器。
首先,就是你媳妇接收你的命令
public void entertainYou(String need){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(Consts.EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(Consts.QUEUE_NAME,true,false,false,null); channel.queueBind(Consts.QUEUE_NAME,Consts.EXCHANGE_NAME,"command.make_lunch"); Map<String,String> content = new HashMap<String, String>(); content.put("id", UUID.randomUUID().toString()); content.put("need",need); content.put("time", Calculator.getCurrTime()); String str = new JSONWriter().write(content); channel.basicPublish(Consts.EXCHANGE_NAME,"command.make_lunch",getBasicProperties(), str.getBytes("UTF-8")); ConnectionPool.closeChannel(channel); ConnectionPool.closeConnection(connection); } catch (IOException e) { e.printStackTrace(); } } private AMQP.BasicProperties getBasicProperties(){ return basicProperties==null? basicProperties = new AMQP.BasicProperties.Builder() .contentType(Consts.ContentType.JSON) .contentEncoding("UTF-8") .build():basicProperties; }
然后去厨房做饭:
public void makeLunch(){ Connection connection = ConnectionPool.getConnection(); try { final Channel channel = connection.createChannel(); channel.exchangeDeclare(Consts.EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(Consts.QUEUE_NAME,true,false,false,null); channel.queueBind(Consts.QUEUE_NAME,Consts.EXCHANGE_NAME,"command.make_lunch"); channel.basicConsume(Consts.QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("你媳妇接受了你的请求:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } }
测试你的命令是否可用呢?
public static void main(String args[]){ //我发出命令 System.out.println("我命令媳妇去做饭!!"); cashier.entertainYou("我要吃糖醋排骨"); System.out.println("我继续嗑着瓜子、看着电视"); //媳妇接收命令后就去做饭了 kitchen.makeLunch(); //我发出命令 System.out.println("我又想吃其它的了!!"); cashier.entertainYou("我还要吃土豆烧鸡"); }
测试的结果:
我命令媳妇去做饭!! 我继续嗑着瓜子、看着电视 我又想吃其它的了!! 你媳妇正在做:{need=我要吃糖醋排骨, id=652e8542-bd9c-4e78-9703-459b3d885b0c, time=2018-12-06 15:28:13} 你媳妇正在做:{need=我还要吃土豆烧鸡, id=0a1dfb75-b20f-4b93-8da7-67178b1cd6aa, time=2018-12-06 15:28:13}
实例二
小明在你的购物网站上花2000块钱买一部手机,确认下单并且已经付款后,你的网站需要做如下操作:1.给小明的积分加20,2.把订单发给仓库准备出货处理,3.通知财务开一张发票。
首先接收订单:
private static final String ORDER_EXCHANGE = "exchange.order"; private static final String ORDER_QUEUE = "queue.order"; private static final String ORDER_ROUTING = "order"; public void acceptOrder(Map<String,String> orderMap){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".jf",true,false,false,null); channel.queueDeclare(ORDER_QUEUE+".ck",true,false,false,null); channel.queueDeclare(ORDER_QUEUE+".cw",true,false,false,null); channel.queueBind(ORDER_QUEUE+".jf",ORDER_EXCHANGE,ORDER_ROUTING); channel.queueBind(ORDER_QUEUE+".ck",ORDER_EXCHANGE,ORDER_ROUTING); channel.queueBind(ORDER_QUEUE+".cw",ORDER_EXCHANGE,ORDER_ROUTING); Map<String,Object> content = new HashMap<String, Object>(); Map<String,String> body = new HashMap<String, String>(); content.put("id", UUID.randomUUID().toString()); content.put("time", Calculator.getCurrTime()); content.put("orderContent",body); body.putAll(orderMap); String str = new JSONWriter().write(content); channel.basicPublish(ORDER_EXCHANGE,ORDER_ROUTING,getBasicProperties(), str.getBytes("UTF-8")); ConnectionPool.closeChannel(channel); ConnectionPool.closeConnection(connection); } catch (IOException e) { e.printStackTrace(); } }
然后,三个消费者:
public void jf(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".jf",true,false,false,null); channel.queueBind(ORDER_QUEUE+".jf",ORDER_EXCHANGE,ORDER_ROUTING); channel.basicConsume(ORDER_QUEUE+".jf",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("积分系统收到了订单:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } } public void ck(){ Connection connection = ConnectionPool.getConnection(); try { final Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".ck",true,false,false,null); channel.queueBind(ORDER_QUEUE+".ck",ORDER_EXCHANGE,ORDER_ROUTING); channel.basicConsume(ORDER_QUEUE+".ck",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("仓库系统收到了订单:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } } public void cw(){ Connection connection = ConnectionPool.getConnection(); try { final Channel channel = connection.createChannel(); channel.exchangeDeclare(ORDER_EXCHANGE, BuiltinExchangeType.FANOUT,true); channel.queueDeclare(ORDER_QUEUE+".cw",true,false,false,null); channel.queueBind(ORDER_QUEUE+".cw",ORDER_EXCHANGE,ORDER_ROUTING); channel.basicConsume(ORDER_QUEUE+".cw",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("财务系统收到了订单:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } }
测试:
public static void main(String args[]){ //小明已经下单,产生订单信息 Map<String,String> order = new HashMap<String, String>(); order.put("customerId","12008384"); order.put("goodsId","8030999"); order.put("goods","手机"); order.put("money","2000"); //接收订单进行分发 cashier.acceptOrder(order); //三个系统级消费者并行处理各自的业务 cashier.jf(); cashier.ck(); cashier.cw(); }
结果:
积分系统收到了订单:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手机}} 仓库系统收到了订单:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手机}} 财务系统收到了订单:{id=f897e922-4605-4602-a50e-c0fe863018b7, time=2018-12-06 16:02:05, orderContent={money=2000, goodsId=8030999, customerId=12008384, goods=手机}}
实例三
公司有一个已经上线的业务系统,主要可以分成两个模块M1,M2,但这个系统可能会出现问题,按CTO要求在出现问题的时候需要及时的处理,否则扣工资。你的上级负责整个系统,需要知道整个系统是否出现问题,你主要负责M1模块,只要此模块不出问题你就不会扣工资。另一位同事小明负责M2。
特点:这就是一个告警通知的实例,不同的就是你的上级需要接收M1和M2两个模块的告警,两位负责人只负责各自模块即可。由此分析可以使用topic类型的交换器。
产生告警后发送出去:
private static final String ALARM_EXCHANGE = "exchange.alarm1"; private static final String ALARM_QUEUE = "queue.alarm1"; public void acceptAlarm(Map<String,String> orderMap,String routing){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ALARM_EXCHANGE, BuiltinExchangeType.TOPIC,true); channel.queueDeclare(ALARM_QUEUE+".M1",true,false,false,null); channel.queueDeclare(ALARM_QUEUE+".M2",true,false,false,null); channel.queueBind(ALARM_QUEUE+".M1",ALARM_EXCHANGE,"M1.*"); channel.queueBind(ALARM_QUEUE+".M2",ALARM_EXCHANGE,"*.M2"); Map<String,Object> content = new HashMap<String, Object>(); Map<String,String> body = new HashMap<String, String>(); content.put("id", UUID.randomUUID().toString()); content.put("time", Calculator.getCurrTime()); content.put("content",body); body.putAll(orderMap); String str = new JSONWriter().write(content); channel.basicPublish(ALARM_EXCHANGE,routing,getBasicProperties(), str.getBytes("UTF-8")); ConnectionPool.closeChannel(channel); ConnectionPool.closeConnection(connection); } catch (IOException e) { e.printStackTrace(); } }
接收告警信息
public void alarm_M1(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ALARM_EXCHANGE, BuiltinExchangeType.TOPIC,true); channel.queueDeclare(ALARM_QUEUE+".M1",true,false,false,null); channel.queueBind(ALARM_QUEUE+".M1",ALARM_EXCHANGE,"M1.*"); channel.basicConsume(ALARM_QUEUE+".M1",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("收到了M1系统的告警并发送给我:"+content); System.out.println("收到了M1系统的告警并发送给领导:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } } public void alarm_M2(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(ALARM_EXCHANGE, BuiltinExchangeType.TOPIC,true); channel.queueDeclare(ALARM_QUEUE+".M2",true,false,false,null); channel.queueBind(ALARM_QUEUE+".M2",ALARM_EXCHANGE,"*.M2"); channel.basicConsume(ALARM_QUEUE+".M2",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String contentType = properties.getContentType(); //System.out.println(contentType); Map<String,String> content = (Map<String,String>)new JSONReader().read(new String(body)); System.out.println("收到了M2系统的告警并发送给小明:"+content); System.out.println("收到了M2系统的告警并发送给领导:"+content); getChannel().basicAck(envelope.getDeliveryTag(),false); } }); } catch (IOException e) { e.printStackTrace(); } }
测试,产生告警信息后分发
public static void main(String args[]){ //M1模块出错了 Map<String,String> alarm = new HashMap<String, String>(); alarm.put("errorMsg","M1系统发现错误"); cashier.acceptAlarm(alarm,"M1.error"); //M1和M2都出错了 Map<String,String> alarm2 = new HashMap<String, String>(); alarm2.put("errorMsg","M1和M2系统都发现错误"); cashier.acceptAlarm(alarm2,"M1.M2");//这样,信息会发到M1和M2队列中 cashier.alarm_M1(); cashier.alarm_M2(); }
测试结果,总共发送6条信息,M1中有两条信息发送给自己后也会发送给领导,M2中只有一条信息:
收到了M1系统的告警并发送给我:{id=ed87c923-0302-4d1d-b729-4b061b3bb9e1, time=2018-12-06 16:52:41, content={errorMsg=M1系统发现错误}} 收到了M1系统的告警并发送给领导:{id=ed87c923-0302-4d1d-b729-4b061b3bb9e1, time=2018-12-06 16:52:41, content={errorMsg=M1系统发现错误}} 收到了M1系统的告警并发送给我:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系统都发现错误}} 收到了M1系统的告警并发送给领导:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系统都发现错误}} 收到了M2系统的告警并发送给小明:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系统都发现错误}} 收到了M2系统的告警并发送给领导:{id=e9d0ec24-3f76-492c-8e5d-b1d0e5a7cfc4, time=2018-12-06 16:52:41, content={errorMsg=M1和M2系统都发现错误}}
实现RPC功能
RPC功能常见的有REST API和SOAP等,而这些都有自己的协议。而使用rabbitmq实现则不需要任何协议,rabbit并不关心生产者或消费者是谁,他只负责传递给定的内容即完成它的任务。rabbit会负责绑定来路的消息到达合适的队列。RPC服务器会从这些队列上消费消息。但问题在于AMQP消息是单向的,如何将应答返回给客户端呢??
由rabbit处理RPC服务器和RPC客户端中间,它并不知道是谁发送和消费的消息。rabbit有一个解决办法是使用消息来发回应答。在每个AMQP消息头里有个字段叫replyTo,消息的生产者可以通过此字段确定队列名称,并监听队列等待应答。而replyTo可以使用queue的默认名称(rabbit会产生唯一的ID),因为此队列只是临时应答使用,所以用完就要销毁且是RPC客户端线程的私有队列。
具体实现,RPC服务器
private static final String RPC_EXCHANGE = "exchange.rpc2"; private static final String RPC_QUEUE = "queue.rpc2"; private static final String RPC_ROUTING = "queue.rpc2"; public void rpcServer(){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(RPC_EXCHANGE, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(RPC_QUEUE,true,false,false,null); channel.queueBind(RPC_QUEUE,RPC_EXCHANGE,RPC_ROUTING); //订阅RPC队列,如果有消息就消费 channel.basicConsume(RPC_QUEUE,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //确认接收 getChannel().basicAck(envelope.getDeliveryTag(),false); //取内容并且包装返回结果 String content = new String(body); Map<String,String> alarm = new HashMap<String, String>(); alarm.put("content",content); alarm.put("id", UUID.randomUUID().toString()); alarm.put("time", Calculator.getCurrTime()); String str = new JSONWriter().write(alarm); System.out.println(properties.getReplyTo()); //交换器为空,表示以队列名称为路由,且把返回的内容放到 客户端传过来的临时队列 getChannel().basicPublish("",properties.getReplyTo(),getBasicProperties(), str.getBytes("UTF-8")); } }); } catch (IOException e) { e.printStackTrace(); } }
客户端
public void rpcClient(String str1){ Connection connection = ConnectionPool.getConnection(); try { Channel channel = connection.createChannel(); channel.exchangeDeclare(RPC_EXCHANGE, BuiltinExchangeType.DIRECT,true); channel.queueDeclare(RPC_QUEUE,true,false,false,null); //重新建立一个临时的队列,队列名称由rabbit分配,并且是私有 自动删除 AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(); channel.queueBind(RPC_QUEUE,RPC_EXCHANGE,RPC_ROUTING); AMQP.BasicProperties bp = new AMQP.BasicProperties.Builder() .contentType(Consts.ContentType.JSON) .contentEncoding("UTF-8") //replyTo记录临时队列的名称,传给服务器 .replyTo(declareOk.getQueue()) .build(); channel.basicPublish(RPC_EXCHANGE,RPC_ROUTING,bp, str1.getBytes("UTF-8")); //订阅临时队列,如果有内容就取出来 channel.basicConsume(declareOk.getQueue(),false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { getChannel().basicAck(envelope.getDeliveryTag(),false); String str = new String(body); System.out.println("返回的内容是:"+consumerTag+str); //如何取消订阅??? getChannel().basicCancel(consumerTag); //完成访问,关闭信道,同时也关闭连接(按需要,可以考虑不关闭AMQP连接Connection) ConnectionPool.closeChannel(getChannel()); ConnectionPool.closeConnection(getChannel().getConnection()); } }); //System.out.println("declareOk:"+declareOk.getQueue()); //GetResponse response = channel.basicGet(declareOk.getQueue(),false); // String str = new String(response.getBody()); //System.out.println("返回的内容是:"+str); } catch (IOException e) { e.printStackTrace(); } }
测试,先启动服务器,然后启动客户端
public static void main(String args[]){ //cashier.rpcServer(); cashier.rpcClient("入参"); }
测试结果:
返回的内容是:amq.ctag-OaoaqOQUAHGyXcqAaPT6lw{"id":"fc243322-30fe-4f1e-be5d-e7cd8406349d","time":"2018-12-06 18:06:13","content":"入参"}
总结
本章主要从实践的角度结合前两章学习的内容进行编码。给出一般接到项目时一般的思路,在编码时如何选择交换器的类型,如何考虑项目的扩展性。时开发前必须要思考的问题。