版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/ldb987/article/details/83542726
利用定时器将消息发送到mq
利用定时器,定时扫描数据库是否有符合条件的数据,一分钟执行一次,有数据就发送到mq。
/**
* 查询未发送的消息 1分钟执行一次
*/
@Scheduled(cron = "0 1/1 * * * ? ")
public void queryNotSendMessage(){
MessageGroupSendExample example = new MessageGroupSendExample();
MessageGroupSendExample.Criteria criteria = example.createCriteria();
criteria.andMsgStatusEqualTo(new Byte("1"));
criteria.andDelFlagEqualTo(false);
criteria.andSendTimeLessThan(new Date());//小于当前时间
example.setOrderByClause("send_time asc");//排序
List<MessageGroupSend> messageGroupSendList = messageGroupSendMapper.selectByExample(example);
if (CollectionUtils.isEmpty(messageGroupSendList)){
logger.debug("job消息中心未查询到信息");
return;
}
String messageGroupSendListJson = JSON.toJSONString(messageGroupSendList);
logger.debug("job未发送的消息查询结果:{}", messageGroupSendListJson);
Message evmessage = new Message(mqTopics, messageGroupTag, messageGroupSendListJson.getBytes());
MessageProducer.sendMq(mqNameServer, producerGroupName, evmessage);
//修改状态为发送中
for (MessageGroupSend messageGroupSend : messageGroupSendList) {
MessageGroupSendExample messageGroupSendExample = new MessageGroupSendExample();
MessageGroupSendExample.Criteria groupSendExampleCriteria = messageGroupSendExample.createCriteria();
groupSendExampleCriteria.andMgsPkidEqualTo(messageGroupSend.getMgsPkid());
messageGroupSend.setMsgStatus(new Byte("3"));
messageGroupSendMapper.updateByExampleSelective(messageGroupSend,messageGroupSendExample);
}
}
知识点
1、@Scheduled
@Scheduled(cron = "0 1/1 * * * ? ")
- 使用@Scheduled注解实现定时任务,再springboot中使用只需要引入pom依赖,然后在主启动类上加注解@EnableScheduling即可实现。
- @Scheduled有两种定时的设置方法:
A、 一种是Rate/Delay表达式(毫秒值):
@Scheduled(fixedRate = 6000):上一次开始执行时间点后每隔6秒执行一次。
@Scheduled(fixedDelay = 6000):上一次执行完毕时间点之后6秒再执行。
B、一种是cornexpression:
@Scheduled(cron = “0 0 3 * * ?”) //每天凌晨3:00执行任务
cron表达式中各时间元素使用空格进行分割,参数按顺序依次表示如下含义:
a.秒(0~59)
b.分钟(0~59)
c.小时(0~23)
d.天(月)(0~31,但是你需要考虑你月的天数)
e.月(0~11)
f.天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,FRI,SAT)
f.年份(1970-2099)
2、rocketMQ参数
Message evmessage = new Message(mqTopics, messageGroupTag, messageGroupSendListJson.getBytes());
MessageProducer.sendMq(mqNameServer, producerGroupName, evmessage);
- topic:Topic是生产者在发送消息和消费者在拉取消息的类别,我们可以理解为第一级消息类型,类似于书的标题。例子中表示生产者发送mqTipics这一类别的消息,消费者也会订阅此类别。
- tag:Tag是子主题,我们可以理解二级标题,类似于书的目录,方便检索使用消息。例子中表示生产者发送mqTopics类别下messageGroupTag类消息。
- groupName:代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。**在消费者组中,可以实现消息消费的负载均衡和消息容错目标。**在集群中,一个生产者down了,可以使用另一个,不至于影响正常业务。
使用push方法消费消息
通过handle调取消费实现过程,然后调用消费者订阅了和生产者相同topic,相同tags下的消息,并调用start方法开启消费。
@Override
public void run(String... strings) {
logger.info(">>>>>>>>>>>>>>>消息监听启动:消息中心<<<<<<<<<<<<<");
MessageGroupMessageHandler messageGroupHandler = new MessageGroupMessageHandler();
MessageConsumer rkMQConsumer = new MessageConsumer();
rkMQConsumer.startMqConsumer(mqNameServer, consumerGroupName, mqTopics, tags, messageGroupHandler);
}
@Component
public class MessageGroupMessageHandler implements MessageHandlers{
@Override
public void handle(String message) {
MemberMessageGroupSendService memberMessageGroupSendService = SpringContextUtils.getBean(MemberMessageGroupSendService.class);
ArrayList<MessageGroupSend> list = JSON.parseObject(message, new TypeReference<ArrayList<MessageGroupSend>>() {});
memberMessageGroupSendService.sendMessageGroup(list);
}
}
消费消息实现
@Override
public void sendMessageGroup(ArrayList<MessageGroupSend> list) {
logger.info("sendMessageGroup:"+JSONObject.toJSONString(list));
List<Member> memberList=new ArrayList<Member>();
List<String> memberPhoneList = new ArrayList<>();
Map<String, String> map = new HashMap<>();
MovieCompany movieCompany = new MovieCompany();
for (MessageGroupSend groupSend : list) {
try {
map.put("content", groupSend.getSmsContent());
MovieCompanyExt movieCompanyExt = movieCompanyMapperExt.getCompanyByCompanyCode(groupSend.getCompanyCode());
BeanUtils.copyProperties(movieCompanyExt, movieCompany);
movieCompany.setjPushMasterSecret(movieCompanyExt.getJpushMastersecret());
String memberGroupIds = groupSend.getMemberGroupId();
String cinemas = groupSend.getCinemas();
if (StringUtils.equals(memberGroupIds, "-1")) {//全部会员
String[] cinemaCodes = cinemas.split(",");
for (String cinemaCode : cinemaCodes) {
MemberExample example = new MemberExample();
MemberExample.Criteria criteria = example.createCriteria();
criteria.andMemberOwnerEqualTo(cinemaCode);
memberList = memberMapper.selectByExample(example);
for (Member member : memberList) {
memberPhoneList.add(member.getMemberPhone());
}
}
} else {//查分组会员
List<String> groupListids = new ArrayList<>();
List<String> memberIdList = new ArrayList<>();
String[] str = memberGroupIds.split(",");
groupListids = Arrays.asList(str);
Map<String, List<String>> mapMemberId = eventService.getGroupMemberList(groupListids);
for (Map.Entry<String, List<String>> entry : mapMemberId.entrySet()) {
List<String> valueList = entry.getValue();
for (String memberId : valueList) {
memberIdList.add(memberId);
}
}
MemberExample example = new MemberExample();
MemberExample.Criteria criteria = example.createCriteria();
criteria.andMemberCodeIn(memberIdList);
memberList = memberMapper.selectByExample(example);
for (Member member : memberList) {
memberPhoneList.add(member.getMemberPhone());
}
}
if (StringUtils.isNotBlank(groupSend.getPushContent())) {
appNoticeService.sendAppMessByJpush(JSON.toJSONString(movieCompany), JSON.toJSONString(memberList), groupSend.getMgsPkid(), movieCompany.getApplicationName(), groupSend.getPushContent(), groupSend.getTitle(), SysDictConstants.MEMBER_MESSAGE_GROUP_SEND_NOTICE);
}
if (StringUtils.isNotBlank(groupSend.getSmsContent())) {
smsUtilsService.sendMessage(cinemas.split(",")[0], 6, Joiner.on(",").join(memberPhoneList), map, true, String.valueOf(memberPhoneList.size()));
}
//修改状态为发送完成
MessageGroupSendExample example = new MessageGroupSendExample();
MessageGroupSend messageGroupSend = new MessageGroupSend();
MessageGroupSendExample.Criteria criteria = example.createCriteria();
criteria.andMgsPkidEqualTo(groupSend.getMgsPkid());
messageGroupSend.setMsgStatus(new Byte("4"));
messageGroupSendMapper.updateByExampleSelective(messageGroupSend,example);
logger.info("修改状态成功 {}",JSONObject.toJSONString(groupSend));
} catch (Exception e) {
//e.printStackTrace();
logger.error("发送{}消息错误,错误信息:{} ",JSONObject.toJSONString(groupSend),e);
//修改状态 发送错误 修改error_info
MessageGroupSendExample example = new MessageGroupSendExample();
MessageGroupSend messageGroupSend = new MessageGroupSend();
MessageGroupSendExample.Criteria criteria = example.createCriteria();
criteria.andMgsPkidEqualTo(groupSend.getMgsPkid());
messageGroupSend.setMsgStatus(new Byte("5"));
messageGroupSend.setErrorInfo(e.toString());
messageGroupSendMapper.updateByExampleSelective(messageGroupSend,example);
}
}
}