生产者
TAG枚举类
package com.xxxx.mq;
public class MqTagConstants {
/**注册信息同步至uc00014微服务,统计分析*/
public static final String TAG_USER_PUSH_UC00014 = "uc0012PushSourceAnalyse";
}
普通信息生产工具类
package com.xxxx.mq;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
/**
* 普通信息生产工具类
*/
public class NormalSelfProducerUtils extends ProducerSelfUtils {
private static NormalSelfProducerUtils singletonSon = null;
protected static Producer producer;
public static NormalSelfProducerUtils getSignleton(ProducerSelfEntry outerProducer) {
synchronized (NormalSelfProducerUtils.class) {
if (singletonSon == null) {
singletonSon = new NormalSelfProducerUtils();
ProducerSelfUtils.getSignleton(outerProducer);
producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
}
}
return singletonSon;
}
private NormalSelfProducerUtils() {
super();
}
/**
* 发送信息
*
* @param message
* @return
*/
public SendResult send(Message message) {
return producer.send(message);
}
/**
* 停止生产者
*/
public void shutdownProducer() {
if (producer.isStarted()) {
producer.shutdown();
}
}
/**
* 启动生产者
*/
public void startProducer() {
if (producer.isStarted()) {
return;
}
producer.start();
}
}
普通信息发送工具类
package com.xxx.mq;
public class ProducerSelfEntry {
private String procucerId;
private String accessKey;
private String secretKey;
private String sendMsgTimeoutMillis;
private String onsAddr;
private String topic;
public void setProcucerId(String procucerId) {
this.procucerId = procucerId;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public void setSendMsgTimeoutMillis(String sendMsgTimeoutMillis) {
this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
}
public void setOnsAddr(String onsAddr) {
this.onsAddr = onsAddr;
}
public String getProcucerId() {
return procucerId;
}
public String getAccessKey() {
return accessKey;
}
public String getSecretKey() {
return secretKey;
}
public String getSendMsgTimeoutMillis() {
return sendMsgTimeoutMillis;
}
public String getOnsAddr() {
return onsAddr;
}
public void printInfo() {
System.out.println(toString());
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public String toString() {
return "Producer [procucerId=" + procucerId + ", accessKey=" + accessKey + ", secretKey=" + secretKey
+ ", sendMsgTimeoutMillis=" + sendMsgTimeoutMillis + ", onsAddr=" + onsAddr + ", topic=" + topic
+ "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((accessKey == null) ? 0 : accessKey.hashCode());
result = prime * result + ((onsAddr == null) ? 0 : onsAddr.hashCode());
result = prime * result + ((procucerId == null) ? 0 : procucerId.hashCode());
result = prime * result + ((secretKey == null) ? 0 : secretKey.hashCode());
result = prime * result + ((sendMsgTimeoutMillis == null) ? 0 : sendMsgTimeoutMillis.hashCode());
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ProducerSelfEntry other = (ProducerSelfEntry) obj;
if (accessKey == null) {
if (other.accessKey != null)
return false;
} else if (!accessKey.equals(other.accessKey))
return false;
if (onsAddr == null) {
if (other.onsAddr != null)
return false;
} else if (!onsAddr.equals(other.onsAddr))
return false;
if (procucerId == null) {
if (other.procucerId != null)
return false;
} else if (!procucerId.equals(other.procucerId))
return false;
if (secretKey == null) {
if (other.secretKey != null)
return false;
} else if (!secretKey.equals(other.secretKey))
return false;
if (sendMsgTimeoutMillis == null) {
if (other.sendMsgTimeoutMillis != null)
return false;
} else if (!sendMsgTimeoutMillis.equals(other.sendMsgTimeoutMillis))
return false;
if (topic == null) {
if (other.topic != null)
return false;
} else if (!topic.equals(other.topic))
return false;
return true;
}
}
获取阿里云MQ连接配置类
package com.sgcc.mq;
import java.util.Properties;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
public class ProducerSelfUtils {
protected static ProducerSelfEntry innerProducer;
protected static ProducerSelfUtils singleton;
protected final static Properties properties = new Properties();
public static ProducerSelfUtils getSignleton(ProducerSelfEntry outerProducer) {
innerProducer = outerProducer;
System.out.println("地址信息是:" + innerProducer.getOnsAddr());
// 您在控制台创建的 Producer ID
properties.put(PropertyKeyConst.ProducerId, innerProducer.getProcucerId());
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, innerProducer.getAccessKey());
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, innerProducer.getSecretKey());
// 设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, innerProducer.getSendMsgTimeoutMillis());
// 设置 TCP 接入域名(此处以公共云生产环境为例)
properties.put(PropertyKeyConst.ONSAddr, innerProducer.getOnsAddr());
return singleton;
}
protected ProducerSelfUtils() {
}
}
MQ发送测试类
package com.xxx.mqtest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.aliyun.openservices.ons.api.Message;
@Component
public class SendMqMessageUtil {
public static final Logger log = LoggerFactory.getLogger(SendMqMessageUtil.class);
private String yhzxTopic = "!!!!";
private String yhzxProcucer = "@@@@";
private String accessKey = "####";//改为自己申请的
private String secretKey = "$$$$";//改为自己申请的
private String sendMsgTimeoutMillis = "3000";//改为自己申请的
private String onsAddr = "http://mq.server.sgcloud.com:8080/rocketmq/nsaddr4broker-internal";//改为自己申请的
/**
* 同步用户全量信息至数据打标签分析
*
*/
public void pushSourceAnalyse(String user) {
String serialNo = Math.random()+"";
log.info("流水号:" + serialNo);
JSONObject json = JSONObject.parseObject(user);
String mqData = JSONObject.toJSONString(json, SerializerFeature.WriteMapNullValue);
try {
ProducerSelfEntry producerSelfEntry = new ProducerSelfEntry();
producerSelfEntry.setTopic(yhzxTopic);
producerSelfEntry.setProcucerId(yhzxProcucer);
producerSelfEntry.setAccessKey(accessKey);
producerSelfEntry.setOnsAddr(onsAddr);
producerSelfEntry.setSecretKey(secretKey);
producerSelfEntry.setSendMsgTimeoutMillis(sendMsgTimeoutMillis);
NormalSelfProducerUtils signleton = NormalSelfProducerUtils.getSignleton(producerSelfEntry);
Message msg = new Message();
msg.setTag(MqTagConstants.TAG_USER_PUSH_UC00014);
msg.setTopic(yhzxTopic);
msg.setBody(mqData.getBytes("utf-8"));
signleton.send(msg);
} catch (Exception e) {
log.error("流水号:" + serialNo + "异常。", e);
}
}
}
pom引入jar
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-api</artifactId>
<version>1.7.8.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.7.8.Final</version>
</dependency>
订阅者
自启动实例化消费者对象
package com.jusfoun.gov_platform.mqtest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class AliMQConsumer implements CommandLineRunner {
public static final Logger LOG = LoggerFactory.getLogger(AliMQConsumer.class);
private String yhzxTopic = "yhzx-normal-test";
private String yhzxConsumerId = "CID-yhzx-normal08";
@Autowired
ALiMqCommonEntity aLiMqCommonEntity;
@Override
public void run(String... arg0) throws Exception {
LOG.info("正在启动用户中心户号信息监听*****");
aLiMqCommonEntity.install(yhzxConsumerId,yhzxTopic,MqTagConstants.TAG_USER_PUSH_UC00014);
LOG.info("正在监听用户中心户号信息*****");
}
}
实例化消费者对象的配置参数类
package com.jusfoun.gov_platform.mqtest;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
@Component
public class ALiMqCommonEntity {
private String accessKey = "!!!!!";//配置为自己申请的参数值
private String secretKey = "@@@@@";//配置为自己申请的参数值
private String sendMsgTimeoutMillis = "####";//配置为自己申请的参数值
private String onsAddr = "%%%%%";//配置为自己申请的参数值
@Autowired
private MQConsumerListener mQConsumerListener;
public void install(String consumerId,String topic,String tag){
Properties properties = new Properties();
// 您在控制台创建的 ConsumerId
properties.put(PropertyKeyConst.ConsumerId, consumerId);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis);
// 设置 TCP 接入域名(此处以公共云生产环境为例)
properties.put(PropertyKeyConst.ONSAddr, onsAddr);
properties.put(PropertyKeyConst.ConsumeThreadNums, 16);
properties.put(PropertyKeyConst.MaxReconsumeTimes, 10);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, tag, mQConsumerListener);
consumer.start();
}
}
监听消费者消息并进行业务逻辑处理类
package com.xxx.mqtest;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
@Service("messageListener")
public class MQConsumerListener implements MessageListener {
public static final Logger log = LoggerFactory.getLogger(MQConsumerListener.class);
public static final String tag1 = MqTagConstants.TAG_USER_PUSH_UC00014;
public static final String tag2 = "";
@Override
public Action consume(Message msg, ConsumeContext context) {
try {
String tag = msg.getTag();
log.info("tag:" + tag);
String cotent = new String(msg.getBody(), "utf-8");
if (tag.equals(tag1)) {
log.info("获取消费mq内容:" + cotent);
if (StringUtils.isNotBlank(cotent)) {
System.out.println("消费内容为:" + cotent);
} else {
log.info("监听到的信息为空");
}
return Action.CommitMessage; // 消费成功返回状态
} else if (tag.equals(tag2)) {
if (StringUtils.isNotBlank(cotent)) {
System.out.println("消费内容为:" + cotent);
} else {
log.info("监听到的信息为空");
}
return Action.CommitMessage;
} else {
log.info("监听到的TAG超出处理范围");
return Action.ReconsumeLater;
}
} catch (Exception e) {
log.error("监听消息发生异常****", e);
return Action.CommitMessage;
}
}
}