RocketMQ实战与原理
第一章 安装、部署及简单应用
1. 安装
1.1 下载RocketMQ
RocketMQ下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0
选择rocketmq-all-4.3.0-bin-release.zip
下载
将其解压,注意解压目录名不能带有空格
2. 部署
命令、参数、代码详解比较复杂,故放后面说明,此处只写部署步骤及运行代码,保证可搭建部署简单的应用
2.1 部署nameserver
进入bin目录下,运行start mqnamesrv.cmd,出现The Name Server boot success.即代表nameserver启动成功
2.2 部署broker
bin目录下,运行start mqbroker.cmd -n 127.0.0.1:9876,出现The broker boot success.即代表broker启动成功
2.3 新增Topic
bin目录下,运行mqadmin.cmd updateTopic -n 127.0.0.1 -b 127.0.0.1:10911 -t TopicTest,出现create topic success.即代表topic新增成功
3. 简单应用
3.1 运行Producer
运行程序,将会发送100条消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @Author: Wen-Xueliang
* @Date: Created in 2019/12/14 14:04
* @Description: Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS notification, SMS marketing system, etc..
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
3.2 运行Consumer
运行程序,则会接受100条消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author: Wen-Xueliang
* @Date: Created in 2019/12/14 14:05
* @Description:
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
至此,我们就完成了将RocketMQ安装部署在Windows系统,并运行第一个程序,由生产者生产100条消息,运行第二个程序,由消费者消费100条消息。
后续,将继续阐述RocketMQ的高级应用以及源码的解析。