同步消息
package com.woodie.rocketmq.sendmsg;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("woodie");
// 测试nameserver的地址
producer.setNamesrvAddr("192.168.142.128:9876");
// 启动生产者
producer.start();
String msg = "woodie:你好我是你的朋友";
// 生产者生产消息的时候设置tags,在这里可以通过设置不能的tags来获取对应的数据
// tags 设置*表示 所有换消息, 使用 || 表示获取多个
// Message message = new Message("my-topic", "add || update", msg.getBytes("UTF-8"));
Message message = new Message("my-topic", "add", msg.getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息偏移量:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}
异步步消息
package com.woodie.rocketmq.sendmsg;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("woodie");
// 测试nameserver的地址
producer.setNamesrvAddr("192.168.142.128:9876");
// 启动生产者
producer.start();
String msg = "我的第一个异步发送消息";
Message message = new Message("my-topic", "mytag", msg.getBytes("UTF-8"));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功了" + sendResult);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息偏移量:" + sendResult.getQueueOffset());
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败了" + e);
}
});
// 这里不能直接shutdown(), 异步 的方式 直接就关闭了,
// producer.shutdown();
}
}
测试如下