1.前言
项目中用到阿里的rocketmq,本以为已经和springboot集成好了,找了一圈也没找到,只好自己写一个凑合先用用。
2.win10下rocketmq下载安装
地址:https://github.com/alibaba/RocketMQ/releases
可能已经又更新了,我用的3.5.8,解压后里边有个install.bat,双击运行(这部分可能需要配置maven,java环境,自行处理一下吧),运行完成后有一个target文件夹
3.启动rocketmq
进入目录,…\target\alibaba-rocketmq-broker\alibaba-rocketmq\bin
(1)启动nameServer,ctrl+r,输入cmd,启动命令窗口,cd进入上面路径下输入:
mqnamesrv.exe -n 127.0.0.1:9876
(2)启动brocker,同样新建cmd命令窗口,同路径下输入:
mqbroker.exe -n 127.0.0.1:9876 autoCreateTopicEnable=true
(3)启动admin,新建cmd命令窗口,同路径输入,可以看到topic,需要启动下面代码
mqadmin.exe topicList -n 127.0.0.1:9876
4.测试代码
新建springboot项目:根据springboot开发笔记(1)
http://blog.csdn.net/bjjoy2009/article/details/78079221
pom.xml添加内容
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.29</version>
</dependency>
java代码如下(网上copy的)
package com.example.demo.rocketmq;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
/**
* @author bjjoy
* @date 2018/2/3
**/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
// TODO Auto-generated method stub
return null;
}
};
//producer.sendMessageInTransaction(msg, tranExecuter, arg)
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
package com.example.demo.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author bjjoy
* @date 2018/2/3
**/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for(MessageExt me:msgs){
try {
System.out.println(new String(me.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
5.后续
上面代码只是测试一下rocketmq,下面需要整合到springboot项目中,目前还没想好如何整合。通过注解方式呢,这个不太熟。通过配置方式,像前面博客springboot开发笔记(3)
http://blog.csdn.net/bjjoy2009/article/details/78725398
谁知道呢,先看看吧。