上一篇博客讲解了如何安装RocketMQ,并且也简单的介绍了一下相关RocketMq的概念,那么这篇博客,来剖析一下MQ中的producer的角色,看看它是来干什么的?
上图就是MQ中Producer的有关结构图,下面来着重分析一下每个类的用途
1.MQAdmin:作为MQ应用层最底层的类,为我们提供了所有公共的方法,常用的有如下
根据key、主题名和队列来创建Topic
void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
查询消息队列中的偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
根据各种条件来查询Message信息
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
2.MQProducer:用来发送生产者中的消息,包含了start和shutdown以及各种send方法,其中send方法返回值为sendResult,里面包含着SendStatus也就是发送的状态。send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义。
SEND_OK
消息发送成功
FLUSH_DISK_TIMEOUT
消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消
息才会丢失
SLAVE_NOT_AVAILABLE
消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
3.ClientConfig:Client端公共的配置信息,例如心跳数、持久化的时间间隔等
4.DefaultMQProducer:基础的MQProducer,有一些基本的默认设置,供我们使用。例如默认的队列数目、默认的超时时间等
下面通过一个实例来了解一下Producer中常用的操作
- <span style="font-family:Comic Sans MS;font-size:18px;">/**
- * @FileName: Producer.java
- * @Package:com.test
- * @Description: TODO
- * @author: LUCKY
- * @date:2015年12月28日 下午2:32:22
- * @version V1.0
- */
- package com.test;
- import java.util.List;
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.client.producer.SendCallback;
- import com.alibaba.rocketmq.client.producer.SendResult;
- import com.alibaba.rocketmq.common.message.Message;
- import com.alibaba.rocketmq.common.message.MessageQueue;
- /**
- * @ClassName: Producer
- * @Description: 模拟生产者
- * @author: LUCKY
- * @date:2015年12月28日 下午2:32:22
- */
- public class ProducerTest {
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("Producer");
- // 必须要设置nameserver地址
- producer.setNamesrvAddr("100.66.154.81:9876");
- try {
- // producer.setClientIP("**");
- //设置实例名称
- producer.setInstanceName("dd");
- //设置重试的次数
- producer.setRetryTimesWhenSendFailed(3);
- //开启生产者
- producer.start();
- //创建一条消息
- Message msg = new Message("PushTopic", "push", "1",
- "内容一".getBytes());
- //发送消息
- SendResult result = producer.send(msg);
- //发送,并触发回调函数
- producer.send(msg, new SendCallback() {
- @Override
- //成功的回调函数
- public void onSuccess(SendResult sendResult) {
- System.out.println(sendResult.getSendStatus());
- System.out.println("成功了");
- }
- @Override
- //出现异常的回调函数
- public void onException(Throwable e) {
- System.out.println("失败了"+e.getMessage());
- }
- });
- //获取某个主题的消息队列
- List<MessageQueue> messageQueues = producer
- .fetchPublishMessageQueues("PushTopic");
- System.out.println(messageQueues.size());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- producer.shutdown();
- }
- }
- }
- </span>
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow