---------Maven依赖---------
<!-- 消息队列MQ -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.2.7.Final</version>
</dependency
---------生产者---------
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, S_AliyunAccount.MQ_ProductId);
properties.put(PropertyKeyConst.AccessKey, S_AliyunAccount.AccessKeyId);
properties.put(PropertyKeyConst.SecretKey, S_AliyunAccount.AccessSecretKey);
//公有云生产环境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//公有云公测环境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
//杭州金融云环境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//杭州深圳云环境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
properties.put(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");//此处以公有云生产环境为例
Producer producer = ONSFactory.createProducer(properties);
producer.start();//在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
System.out.println("start " + System.currentTimeMillis());
for (int i = 0; i < 100; i++)
{
String msgBody = "Hello ONS---" + System.currentTimeMillis() + "-----";
Message msg = new Message(
S_AliyunAccount.MQ_Topic,
"TagA",//可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤
msgBody.getBytes()//任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
);
//设置代表消息的业务关键属性,请尽可能全局唯一。
//以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。
//注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_" + System.currentTimeMillis());
////定时消息投递,设置投递的具体时间戳,单位毫秒
//try
//{
//
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
//
msg.setStartDeliverTime(timeStamp);//定时发送的时间点,精确到毫秒
//}
//catch (ParseException e)
//{
//
e.printStackTrace();
//}
////第一种发送方式:异步发送消息,发送结果通过callback返回给客户端,数据不丢失
//producer.sendAsync(msg, new SendCallback() {
//
@Override
//
public void onSuccess(final SendResult sendResult)
//
{
//
//消费发送成功
//
System.out.println("send message success. topic=" +
//
sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
//
}
//
@Override
//
public void onException(OnExceptionContext context)
//
{
//
//消息发送失败
//
System.out.println("send message failed. topic=" +
//
context.getTopic() + ", msgId=" + context.getMessageId());
//
}
//});
////第二种发送方式:单向(Oneway)发送,只要不抛异常就是成功,数据可能丢失
//producer.sendOneway(msg);
//第三种发送方式:同步发送消息,只要不抛异常就是成功,数据不丢失
producer.send(msg);
//SendResult sendResult = producer.send(msg);
//System.out.println(sendResult);
}
System.out.println("end " + System.currentTimeMillis());
//在应用退出前,销毁Producer对象
//注意:如果不销毁也没有问题
//producer.shutdown();
---------消费者---------
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, S_AliyunAccount.MQ_ConsumerId);
properties.put(PropertyKeyConst.AccessKey, S_AliyunAccount.AccessKeyId);
properties.put(PropertyKeyConst.SecretKey, S_AliyunAccount.AccessSecretKey);
String tags = "TagA";// *表示订阅所有Tag消息;TagA||TagB表示订阅TagA和TagB消息;TagA表示只订阅TagA消息
Consumer consumer = ONSFactory.createConsumer(properties);
// consumer.subscribe-->String topic, String subExpression, MessageListener listener
consumer.subscribe(S_AliyunAccount.MQ_Topic, tags, new MessageListener()
{
public Action consume(Message message, ConsumeContext context)
{
System.out.println(System.currentTimeMillis()+" Receive: " + new String(message.getBody()));
// return Action.ReconsumeLater;//当前接收到的消息还是要保留在队列中,后面继续推送
return Action.CommitMessage;// 将当前接收到的消息从队列中移除
}
});
consumer.start();