----发消息-同步\异步\单向\顺序 完整流程

发消息前的准备

关闭运行的防火墙

systemctl stop firewalld.service

这一项不执行,会出现网络连接不上 

MQClientException: No route info of this topic, xxxxx

关闭namesrv服务:


sh bin/mqshutdown namesrv

关闭broker服务 :


sh bin/mqshutdown broker

启动每个服务器的nameserver

nohup sh bin/mqnamesrv &    
tail -f nohup.out

启动broker

# nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true  &       单broker启动

nohup sh bin/mqbroker -c /opt/rocketmq-all-4.3.0-bin-release/conf/2m-2s-sync/broker-a.properties &   集群broker启动
tail -f ~/logs/rocketmqlogs/broker.log

发消息方式类别

同步:顺序执行 用于发送重要的消息

异步:在回调中获取返回的信息 用户对速度要求高的场景

单向:不需要获取返回的信息  对消息发送是否成功不太关注,比如计入日志

顺序:发送的时候,同一个条件的消息发送到一个队列中,接收的时候,用同一个线程去接收一个队列的信息.

同步

producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); 

这句不加,会出现:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:sendDefaultImpl call timeout
broker连接不上

延迟发送

延迟接收

异步

单向

顺序

生产者

消费者

延时

并发

        List<Message> msgs = new ArrayList<Message>();
        Message msg1 = new Message("BaseTopic","Tag1",("Hello World"+1).getBytes());
                Message msg2 = new Message("BaseTopic","Tag1",("Hello World"+2).getBytes());
                Message msg3 = new Message("BaseTopic","Tag1",("Hello World"+3).getBytes());
                msgs.add(msg1);
                msgs.add(msg2);
                msgs.add(msg3);
          SendResult result = producer.send(msgs);

消费_根据sql

事务发送

消费

 
发布了73 篇原创文章 · 获赞 1 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/wenxi2367/article/details/104448525