rocketMQ简介
为什么使用rocketmq
系统的耦合性越高,容错性就越低。以电商为例,用户创建完订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个系统出现故障或者因为升级等原因暂时不可用,都回造成下单的异常,影响用户的体验。
流量削峰
应用系统如果遇到系统请求流量瞬间猛增,有可能会将系统压垮。如果有消息队列,遇到此情况,可以将大量请求存储起来,将一瞬间的峰值请求分散到一段时间进行处理,这样可以大大提高系统的稳定性
异步
用户调用一个接口的时候,可能该接口调用了别的方法。例如:用户注册的时候,后台可能需要调用:查询数据库,插入数据库,发送邮件等等…
但是用户可能并不需要后台将所有的任务执行完毕,那么此时在初入数据口后面加入MQ,用户就能很快得到注册成功的响应而去做一些别的事情。mq的机制又能保证最终的一致性,所以使用起来很安全很稳定。
在这里插入代码片
概述
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。
应用场景
滴滴打车 司机 乘客
乘客下单(消息的提供者),司机接单(消息的消费者) 点对点
公众号:
公众号发布了一条内容,只要订阅了该公众号的人都是可以收到消息的
发布-订阅模式
王者荣耀更新通知:
不像公众号一样,公众号是必须要关注,但是广播模式是不分场景的
广播模式
分裂…
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:理解消息队列使用
- 削峰填谷: 主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
- 异步处理(在注册的同时,异步去处理短信的发送与邮件的发送)
- 系统解耦: 解决不同重要程度、不同能力级别系统之间依赖导致一死全死
- 提升性能: 当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
- 蓄流压测: 线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
RocketMQ特点
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
- 支持严格的消息顺序
- 支持 Topic 与 Queue 两种模式
- 亿级消息堆积能力
- 比较友好的分布式特性
- 同时支持 Push 与 Pull 方式消费消息
- 历经多次天猫双十一海量消息考验
RocketMQ 优势
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
- 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
- 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
- 支持重复消费(RabbitMQ 不支持,Kafka 支持)
性能表现在哪
1.吞吐量
2.时效性,多少时间内能够处理的请求数
3.消息的可靠性
使用RocketMQ
打开官网版本说明找到相对应的版本下载安装包
下载linux安装包,我下载的是4.7.1这个感觉稳定性比较好点
使用虚拟机连接xshell和xftp
1.1打开虚拟机
1.2连接xshell和xftp
1.3把下载的linux包传入xftp放在home文件目录下
安装rocketmq
jdk 1.8
1解压压缩包
进入到xshell中
cd /home
ls 查看rocket压缩包
使用 unzip rocketmq-all-4.7.1-bin-release.zip 解压
解压文件的时候说没有zip的命令
安装命令: yum install zip #提示输入时,请输入y;
安装命令:yum install unzip #提示输入时,请输入y;
cd rocketmq-all-4.7.1-bin-release
cd bin/
ls
2.启动 NameServer
nohup ./bin/mqnamesrv &
查看进程
netstat -ntlp 看到有9876端口就说明启动成功了
3.检测是否启动
netstat -an | grep 9876
4启动broker
启动之前需要编辑配置文件,修改 JVM 内存设置,默认给的内存很大,超过我们的 JVM 了。
cd bin/
vi runserver.sh
xmn 新生代的内存大小
xmx 堆区最大的内存大小
xms 堆区的初始值大小
cd bin/
vi runbroker.sh 跟上面做相应的处理
启动 Broker
nohup ./mqbroker -n localhost:9876 &
查看进程
netstat -ntlp
两个端口启动就成功了
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
5.测试 RocketMQ
消息发送
cd bin/
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Producer
消息接收
cd bin/
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
6.关闭RocketMQ
./bin/mqshutdown broker
./bin/mqshutdown namesrv
安装RocketMQ控制台可视化界面
1.1下载地址
1.2修改配置 打开rocketmq-console 进入src 找到resources找到applicaion.properties
mvn -v 查看有没有配置maven环境变量
没有上图一样显示的话,去配置maven环境变量
找到maven文件夹 -打开bin复制文件路径
打开此电脑-右击属性-高级系统设置-环境变量
2.打包,需要下载一下jar包
mvn clean package -Dmaven.test.skip=true
3.打包完成后,进入 target 启动 jar
启动jar包
如果没有在applicaion中配置端口号和namesrvAddr
可以加上这段话--server.port=9877 --rocketmq.config.namesrvAddr=192.168.106.130:9876
配置的话不用加上这一句 --server.port=9877 --rocketmq.config.namesrvAddr=192.168.106.130:9876 直接java -jar
java -jar rocketmq-console-ng-1.0.0.jar --server.port=9877 --rocketmq.config.namesrvAddr=192.168.106.130:9876
4.打开浏览器访问9877
如果报错,一直在转,没有加载成功
会出现一个跨域问题这是因为我们的 RocketMQ 安装在 Linux 中,控制台在 windows,Linux 需要开放端口才能访问,开放 10909 和 9876 端口
复制下面这几句话去xshell中执行
开放端口
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload
重新启动项目访问9877,应该能进来了
Java 实现消息发送
我在我原有的微服务项目上进行操作
创建一个模块maven项目 java-mq-provider消息生产者
1.导入pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
2.生产消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class MsgProvider {
public static void main(String[] args) throws Exception {
//创建消息生产者
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//设置NameServer
producer.setNamesrvAddr("192.168.106.130:9876");
//启动生产者
producer.start();
//构建消息对象 主题 标签 内容
Message message = new Message("myTopic","myTag",("Test MQ").getBytes());
//发送消息
SendResult result = producer.send(message, 6000);
System.out.println(result);
//关闭生产者
producer.shutdown();
}
}
启动
访问9877查看生产者提供的消息
//根据构建消息对象 的主题my Topic 搜索看到一条生产者信息
点击内容
Java 实现消息消费
创建一个模块maven项目 java-mq-consumer消息消费者
1.pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
@Slf4j
public class MsgConsumer {
public static void main(String[] args) throws MQClientException {
//创建消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//设置NameServer
consumer.setNamesrvAddr("192.168.106.130:9876");
//指定订阅的主题和标签
consumer.subscribe("myTopic","*");
//回调函数 创建了一个消息监听器,一直监听消息队列中的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("Message=>{}",new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}
启动运行,这里看到消息内容我们都已经拿到了
再去访问9877进入消息页面,消费者已经消费完成了
springboot整合rocketMQ
provider
1.创建springboot项目模块boot-mq-provider生产者
一直下一步不需要选择依赖
2.导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.配置yml
rocketmq:
name-server: 192.168.248.129:9876
producer:
group: myprovider
server.port=8080
4.创建订单类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Integer id;
private String buyerName;
private String buyerTel;
private String address;
private Date createDate;
}
5.创建处理器
@RestController
public class ProviderController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendMsg")
public Order sendMsg(){
Order order = new Order(
1,
"阿松大",
"123123",
"软件园",
new Date()
);
this.rocketMQTemplate.convertAndSend("orderTopic",order);
return order;
}
}
运行项目访问sendMsg
consumer
1.创建springboot项目模块boot-mq-consumer消费者
2.导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.配置yml
rocketmq:
name-server: 192.168.248.129:9876
server.port=8081
4.创建service,把provider订单实体类放一份到consumer
@Slf4j
@Service //消息监听 consumerGroup :消费组名称,topic=主题
@RocketMQMessageListener(consumerGroup = "myConsumer",topic = "orderTopic")
public class ConsumerService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("新订单{},发短信",order);
//如果能拿到新的订单,那就根据订单id生成一个减库存的操作,无非就是操作数据库
System.out.println("需要执行减库存的操作.....");
}
}
}
启动项目,看到已经得到订单信息
消息也已经消费成功了
加群领取资料:Java全栈交流群:1135453115各种技术学习