1)、RabbitMQ学习
一、简历:
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
二、何谓队列?
队列是一个存储、组织数据的数据结构,其最大的特性就是FIFO(先进先出),rabbitmq中queue是Rabbitmq的内部对象,用于存储消息。
三、何为消息队列?(MQ)
- 服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以到达另一端,称为即时消息通讯(同步通信);
- 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信);
当然,容器的一个具体实现就是MQ;
四、消息队列的四大好处:
**1、解耦;**每个成员不必受其他成员的影响,可以更独立自主的做自己的事情,只通过一个简单的容器来联系;
**2、提速;**在对于不使用消息队列的时候,我们来想象一个场景:比如下订单,下订单后的操作有:减库存、减金额、发送短信等等微服务操作,如果这些都靠server来发送,减库存100ms、减金额100ms、发送短信100ms,那么总体花费就是100+100+100=300ms,如果你使用多线程的方式也是接近200ms。如果使用消息队列,那么我们就不用管它多久发送达到,只需要插入到队列中,让他自己去执行就可以了,主程序只需要50ms即可。
**3、广播;**有利于一次性给多个微服务发送消息message。
**4、削峰;**比如在秒杀的时候,同时1w个请求来请求服务器,如果全部打在服务器和数据库上,服务器基本上是马上GG,这时候我们就可以引入消息队列,设置消息队列的大小,比如秒杀的件只有100件,那么设置消息队列的大小为100,其他没有入队列的全部返回未秒杀成功,这样就成功的削峰完毕。
五、何为AMQP?
1、介绍:
一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议。消息发送与接收的双发遵守这个协议就可以实现异步通讯。
2、原理:
- Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
- Virtual host:出于多租户和安全设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ Server提供服务的时候,就可以划分出多个vhost,来对每一个用户创建自己的exchange/Queue。
- Connection:publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或者broker服务出现问题。并且在一个TCP连接中也可以开通多个信道,进行流量均衡输送。
- Channel: 如果每一次访问RabbitMQ都建立一个connection,那么在消息量较大的时候建立TCP connection的开销将会是巨大的,并且效率也是极低的。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method 包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Queue:消息最终被送到这里等待consumer取走,一个message可以被同时拷贝到多个queue中。
- Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。类似于计算机网络中的路由器机制。
3、典型的“生成/消费”消息模型:
-
生产者发送消息到broker server(RabbitMQ)。在Broker内部,用户创建Exchange和Queue,然后通过binding机制将两者联系起来。Exchange分发消息,根据类型 / binding的不同分发策略选择正确的queue队列。
-
消费者Consumer从queue中拿取消息。
4、Exchange类型
Exchange类型有:Direct、Fanout、Topic三种类型。
-
Direct类型:点对点
-
Fanout类型:广播
-
Topic类型:通配符匹配规则
#:代表0个或多个单词、字符
*:代表1个或多个单词、字符
2)、SpringBoot集成RabbitMQ:
1、打开上一次的mybatis-springboot环境:
2、添加pow.xml依赖:
<!--ampq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
3、配置application.properties:
# 配置rabbitmq
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.addresses=47.97.192.241
4、在Test中进行测试RabbitMQ:
rabbitmq在springboot中也存在自动配置类,自动配置类可以读取配置文件中的配置进行自动配置。
而且我们可以看到 这个配置类 其实是一个懒加载机制:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
该配置判断:在容器中是否存在RabbitTemplate模版,如果存在才进行自动配置,如果没有使用RabbitTemplate就不进行自动配置。
并且我们找到rabbitTemplate这个函数,可以看到配置过程,配置文件是作为参数传入的:properties;
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers,
ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
messageConverter.ifUnique(template::setMessageConverter);
template.setMandatory(determineMandatoryFlag(properties));
RabbitProperties.Template templateProperties = properties.getTemplate();
if (templateProperties.getRetry().isEnabled()) {
template.setRetryTemplate(
new RetryTemplateFactory(retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))
.createRetryTemplate(templateProperties.getRetry(),
RabbitRetryTemplateCustomizer.Target.SENDER));
}
map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(templateProperties::getExchange).to(template::setExchange);
map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
return template;
}
我们这就来看一下properties这个配置文件类:
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
/**
* RabbitMQ host.
*/
private String host = "localhost";
/**
* RabbitMQ port.
*/
private int port = 5672;
/**
* Login user to authenticate to the broker.
*/
private String username = "guest";
/**
* Login to authenticate against the broker.
*/
private String password = "guest";
/**
* SSL configuration.
*/
private final Ssl ssl = new Ssl();
/**
* Virtual host to use when connecting to the broker.
*/
private String virtualHost;
/**
* Comma-separated list of addresses to which the client should connect.
*/
private String addresses;
我们从这个配置类可以得到:
如果我们没有配置host的时候默认为本机localhost;
如果我们没有配置username和password都是guest以及我们可以配置的一些其他属性:虚拟地址virtualHost,服务端地址addresses;
我们并且可以通过调试得到:通过自动配置后,已经生成好了exchange和routingKey为空串的template对象:
配置完成后,再进入我们的test方法,设置exchange和routingKey还有message进行消息转发。
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test2() {
// 接收数据
Object o=rabbitTemplate.receiveAndConvert("ogj1");
assert o != null;
System.out.println(o.getClass());
System.out.println(o);
}
5、运行成功:
我们来看看在rabbitmq服务端中是否存在这个消息在queue中:
打开ip:15672,登陆:
我们发现,是序列化的形式,原因是rabbitmq自身使用的是jdk自带的序列化机制,所以结果是一段不明代码。
6、我们来用springboot 接收一下rabbitmq中的消息:
@Test
public void test2() {
// 接收数据
Object o=rabbitTemplate.receiveAndConvert("ogj1");
assert o != null;
System.out.println(o.getClass());
System.out.println(o);
}
运行结果:
成功接收到该queue中的消息。
问题:我们怎么来自定义序列化? 当然是自定义config类啦!
我们 来看一下message的转换类:
public interface MessageConverter {
Message toMessage(Object var1, MessageProperties var2) throws MessageConversionException;
default Message toMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType) throws MessageConversionException {
return this.toMessage(object, messageProperties);
}
Object fromMessage(Message var1) throws MessageConversionException;
}
rabbitmq默认使用的是jdk序列模式,我们可以从MessageConvert中选择自定义为JSON。
我们找到这个Json实现类,那么我们下一步就是来自定义Config:
@Configuration
public class MyAmqConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
我们就可以看到这个是一个json序列化对象了。
3)、RabittMQ的监听机制:
使用注解:EnableRabbit+RabbitListener
@EnableRabbit //开启基于注解的rabbitmq
@SpringBootApplication
public class DemoMybatisApplication {
public static void main(String[] args) {
SpringApplication.run(DemoMybatisApplication.class, args);
}
}
/**
* 监听rabbitmq队列中的消息,只要这个队列中有message进入,那么就一定会收到这个消息
* @param user
*/
@RabbitListener(queues = "ogj1")
public void receiveMessage(User user){
System.out.println(user.toString());
}
发送消息到queue中:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test1() {
// 点对点的发送:direct
Map<String,Object> map=new HashMap<String, Object> ();
map.put("msg","这是一个rabbitmq message");
map.put("data", Arrays.asList("hello world",123,true));
User user = new User();
user.setUsername("欧光继");
user.setPasswd("123456");
user.setAddress("重庆市");
user.setId("1");
user.setPhone("123456789");
user.setYoubian("402460");
// 对象被默认的jdk序列化形式发送出去
rabbitTemplate.convertAndSend("exchange.direct","ogj1", user);
System.out.println("发送成功!");
}
测试:
首先运行主程序。然后运行test单元测试程序,进行发送消息到queue。
结果:
并且只要队列有消息,那么就会马上收到该消息并打印。
4)、AmqpAdmin对于RabbitMQ的管理:
我们也可以使用AmqpAdmin来在程序中对rabbitmq进行管理操作。
/**
* 使用amqpAdmin管理rabbitmq
*/
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
//创建一个Exchange
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
}
@Test
public void createQueue() {
//创建一个队列
amqpAdmin.declareQueue(new Queue("amqpadmin.quque",true));
}
@Test
public void binding() {
//创建绑定规则
amqpAdmin.declareBinding(new Binding("amqpadmin.queue",
Binding.DestinationType.QUEUE,"amqpadmin.exchange",
"amqp.haha",null));
}