13-1mq

什么是消息中间件?

消息中间件(MQ)的定义

其实并没有标准定义。一般认为,消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。  

rpc本质是同步调用,调用方要等被调用方的响应才可以  强依赖

为什么要用消息中间件?

假设一个电商交易的场景,用户下单之后调用库存系统减库存,然后需要调用物流系统进行发货,如果交易、库存、物流是属于一个系统的,那么就是接口调用。但是随着系统的发展,各个模块越来越庞大、业务逻辑越来越复杂,必然是要做服务化和业务拆分的。这个时候就需要考虑这些系统之间如何交互,第一反应就是RPC(Remote Procedure Call)。系统继续发展,可能一笔交易后续需要调用几十个接口来执行业务,比如还有风控系统、短信服务等等。这个时候就需要消息中间件登场来解决问题了。

所以消息中间件主要解决分布式系统之间消息的传递,同时为分布式系统中其他子系统提供了伸缩性和扩展性。为系统带来了:

低耦合,不管是程序还是模块之间,使用消息中间件进行间接通信。

异步通信能力,使得子系统之间得以充分执行自己的逻辑而无需等待。

缓冲能力,消息中间件像是一个巨大的蓄水池,将高峰期大量的请求存储下来慢慢交给后台进行处理,对于秒杀业务来说尤为重要。

名称解释:

伸缩性,是指通过不断向集群中加入服务器的手段来缓解不断上升的用户并发访问压力和不断增长的数据存储需求。就像弹簧一样挂东西一样,用户多,伸一点,用户少,浅一点,啊,不对,缩一点。是伸缩,不是深浅。衡量架构是否高伸缩性的主要标准就是是否可用多台服务器构建集群,是否容易向集群中添加新的服务器。加入新的服务器后是否可以提供和原来服务器无差别的服务。集群中可容纳的总的服务器数量是否有限制。

扩展性,主要标准就是在网站增加新的业务产品时,是否可以实现对现有产品透明无影响,不需要任何改动或者很少改动既有业务功能就可以上线新产品。比如用户购买电影票的应用,现在我们要增加一个功能,用户买了铁血战士的票后,随机抽取用户送异形的限量周边。怎么做到不改动用户购票功能的基础上增加这个功能。熟悉设计模式的同学,应该很眼熟,这是设计模式中的开闭原则(对扩展开放,对修改关闭)在架构层面的一个原则。

和RPC有何区别?

RPC和消息中间件的场景的差异很大程度上在于就是“依赖性”和“同步性”。

比如短信通知服务并不是事交易环节必须的,并不影响下单流程,不是强依赖,所以交易系统不应该依赖短信服务。比如一些数据分析程序可能需要在拿到一天的总销售量,这个就只需要销售中心提供接口在需要时调用即可。

消息中间件出现以后对于交易场景可能是调用库存中心等强依赖系统执行业务,之后发布一条消息(这条消息存储于消息中间件中)。像是短信通知服务、数据统计服务等等都是依赖于消息中间件去消费这条消息来完成自己的业务逻辑。

RPC方式是典型的同步方式,让远程调用像本地调用。消息中间件方式属于异步方式。消息队列是系统级、模块级的通信。RPC是对象级、函数级通信。

相同点:都是分布式下面的通信方式。

消息中间件有些什么使用场景?

 

 

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

传统模式的缺点:

1)  假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;

2)  订单系统与库存系统耦合;

如何解决以上问题呢?引入应用消息队列后的方案

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

流量削峰

流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列:可以控制活动的人数;可以缓解短时间内高流量压垮应用。

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;秒杀业务根据消息队列中的请求信息,再做后续处理。

日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:

日志采集客户端,负责日志数据采集,定时写入Kafka队列:Kafka消息队列,负责日志数据的接收,存储和转发;日志处理应用:订阅并消费kafka队列中的日志数据;

消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

消息中间件的编年史

卡夫卡与法国作家马塞尔·普鲁斯特,爱尔兰作家詹姆斯·乔伊斯并称为西方现代主义文学的先驱和大师。《变形记》是卡夫卡的短篇代表作,是卡夫卡的艺术成就中的一座高峰,被认为是20世纪最伟大的小说作品之一。

常见的消息中间件比较

如果一般的业务系统要引入MQ,怎么选型:

用户访问量在ActiveMQ的可承受范围内,而且确实主要是基于解耦和异步来用的,可以考虑ActiveMQ,也比较贴近Java工程师的使用习惯。 

RabbitMQ,但是确实erlang语言阻止了我们去深入研究和掌控,对公司而言,几乎处于不可控的状态,但是确实是开源的,有比较稳定的支持,活跃度也高。

对自己公司技术实力有绝对自信的,可以用RocketMQ 。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用ActiveMQ、RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用RocketMQ是很好的选择

如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,几乎是全世界这个领域的事实性规范。

JMS和ActiveMQ

JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,实际上是一套api,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,ActiveMQ而是这个规范的一个具体实现。

JMS规范

JMS对象模型

1)连接工厂。连接工厂负责创建一个JMS连接。

2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。

4)JMS目的/ Broker。客户用来指定它生产的消息的目标和它消费的消息的来源的对象,一个消息中间件的实例。

5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。

消息的消费可以采用以下两种方法之一:

· 同步消费。通过调用 消费者的receive 方法从目的地中显式提取消息。receive 方法可以一直阻塞到消息到达。

· 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

JMS规范中的消息

JMS 消息由以下三部分组成:

· 消息头。每个消息头字段都有相应的getter 和setter 方法。

· 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。

· 消息体。JMS 定义的消息类型有TextMessage、MapMessage、BytesMessage、

StreamMessage 和 ObjectMessage。ActiveMQ也有对应的实现。

JMS消息模型

Point-to-Point(P2P)  / 点对点

消息通过称为队列的一个虚拟通道来进行交换。队列是生产者发送消息的目的地和接受者消费消息的消息源。

每条消息通仅会传送给一个接受者。可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费。

消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者当消息已被消费时,就会从队列头部将它们删除。

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)

发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都应该被成功处理的话,使用这个P2P模式。

,生产者发送的消息一定会有消费者消费,且只有一个消费者消费 用点对点模式

Topic/ 主题(发布订阅(Pub/Sub) )

1、消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

3、如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用topic模型

可以不被处理或者可以被多个消费者消费,用主题模式

ActiveMQ

ActiveMQ安装、部署和运行

下载 Windows版 ActiveMQ,解压,运行bin目录下的activemq.bat即可。Linux下操作类似(进入bin目录运行./activemq start启动,./activemq stop关闭)。

下载地址:http://activemq.apache.org/activemq-580-release.html

运行后在浏览器中访问http://127.0.0.1:8161/admin,即可看到ActiveMQ的管理控制台

ActiveMQ中,61616为服务端口,8161为管理控制台端口。

使用ActiveMQ

原生API

Maven配置

connection.createSession参数解释:

第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。

第二个参数消息的确认模式:

AUTO_ACKNOWLEDGE : 指定消息接收者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。

CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)

DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。

消费者的接收方式:同步和异步

和Spring的结合

1、增加Maven配置

2、生产者在spring的配置中增加ActiveMQ相关配置,包括命名空间

3、生产者在代码中编写发送逻辑,可以topic模式,也可queue模式

4、消费者在spring的配置中增加ActiveMQ相关配置,包括命名空间

5、消费者在代码中编写接收逻辑,可以topic模式,也可queue模式

详情见代码,模块名,生产者:with-spring,消费者:with-spring-consumer

=======================

1 生产者有两种模式:点对点和订阅者(在xml中配置),后写java代码用于发送消息(代码中指定发送给哪个队列)

2 生产者消费有两种模式:topic和queue(在xml中配置,同时指定了队列的名称,和接受消息的类),后编写代码用于接受消息的类

先启动消费者,后调用controller层,调用发送消息的代码

controller :指定

生产者:

1 applicationContext.xml:

2 写代码发送:

消费者:

1 xml;

两张监听方式:topic 和 queue

2代码实现监听

 topic:

 queue:

-------------------

和Springboot的结合

和Spring基本没什么差别,参见reply包下的代码。运行测试程序,在测试目录test下。

1依赖

2配置:

3代码 生产者:

消费者:

 

注意:

queue和topic不同的地方:

ActiveMQ实战之一    用户注册的异步处理

参见模块asyncApp,注意观察:

  1. 串行实现
  2. 并行实现
  3. 使用MQ进行实现
  4. 给MQ实现,增加短信服务的校验码应答给生产者处理
  5. 系统扩展,需要给数据中心和客户服务也发送消息

ActiveMQ高级特性和用法

嵌入式ActiveMQ

在自己的应用程序中嵌入一个消息队列。见代码模块 no-spring包embed下。

消息存储的持久化

ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,怎么办,可以进行消息的持久化,ActiveMQ提供了几种持久化方式:

  1. AMQ消息存储-基于文件的存储方式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,如果一条消息的大小超过了32M,那么这个值必须设置大一点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
  2. KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式;KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短,从5.4版本之后KahaDB做为默认的持久化方式。
  3. JDBC消息存储-消息基于JDBC存储的;
  4. Memory消息存储-基于内存的消息存储,由于内存不属于持久化范畴。所以内存存储不在讨论范围内。

KahaDB

由于KahaDB是默认的持久化存储方案。所以即使你不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。这种情况下,KahaDB文件所在位置是你的ActiveMQ安装路径下的/data/KahaDB子目录。

关系型数据库存储方案

从ActiveMQ 4+版本开始,ActiveMQ就支持使用关系型数据库进行持久化存储——通过JDBC实现的数据库连接。可以使用的关系型数据库囊括了目前市面的主流数据库。

使用JDBC的方式持久化

1、修改配置文件conf/activemq.xml:

将其中的这段配置:

<persistenceAdapter>

<kahaDB directory="${activemq.base}/data/kahadb"/>

</persistenceAdapter>

修改为为:

<persistenceAdapter>

       <jdbcPersistenceAdapter  dataSource="#mysql-ds "/>

</persistenceAdapter>

2、然后在</broker>标签后,增加数据源的配置:

    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">

        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

        <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"/>

        <property name="username" value="root"/>

        <property name="password" value="123456"/>

        <property name="poolPreparedStatements" value="true"/>

    </bean>

其中?relaxAutoCommit=true必须有,其他的属性根据数据库的配置自行决定。

3、将mysql-connector-java-5.1.34-bin.jar(版本可以自行选择)放到ActiveMQ的/ lib目录下。

4、在Mysql数据库中增加在连接字符串中设置的数据库名activemq

5、运行后,会发现在库中增加了3个表

activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存,主要数据库字段如下:

container:消息的destination

sub_dest:如果是使用static集群,这个字段会有集群其他系统的信息

client_id:每个订阅者都必须有一个唯一的客户端id用以区分

sub_name:订阅者名称

selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作

last_acked_id:记录消费过的消息的id

activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。主要的数据库字段如下:

id:自增的数据库主键

container:消息的destination

msgid_prod:消息发送者客户端的主键

msg_seq:是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid

expiration:消息的过期时间,存储的是从1970-01-01到现在的毫秒数

msg:消息本体的java序列化对象的二进制数据

priority:优先级,从0-9,数值越大优先级越高

猜你喜欢

转载自blog.csdn.net/xiaoxiaoniaoQ/article/details/87001067
MQ1
MQ