一、编写定时任务
package com.sf.map.tool.station;
import com.sf.map.tool.station.mapper.DeviceMapper;
import com.sf.map.tool.station.service.DataSyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年07月22日 11:12:38
*/
@Component
@Slf4j
public class StationSyncTask {
@Autowired
private DataSyncService dataSyncService;
@Value("${webservice.enable-topQuery}")
private boolean enableSyncStation;
@Value("${webservice.enable-currLineStateQuery}")
private boolean enableSyncStationLink;
@Value("${webservice.enable-currBsCallQuery}")
private boolean enableSyncStationCall;
@Value("${webservice.retry-count-topQuery}")
private int syncStationRetryCount;
@Value("${webservice.retry-count-currLineStateQuery}")
private int syncStationLinkStateRetryCount;
@Value("${webservice.retry-count-currBsCallQuery}")
private int syncStationCallStateRetryCount;
/**
* 定时同步基站基础信息数据,每天凌晨1点执行一次
*/
@Scheduled(cron = "${webservice.frequency-topQuery}")
public void syncStationData() {
if(enableSyncStation) {
int count = 0;
boolean b;
do {
b = dataSyncService.syncStationData();
count++;
} while (!b && count <= syncStationRetryCount); //如果同步失败,尝试重新调用接口,但次数不超过3次
log.info("基站数据同步{},共调用了{}次webservce接口", b ? "成功" : "失败", count);
}
}
/**
* 同步基站链路状态数据
*/
@Scheduled(cron = "${webservice.frequency-currLineStateQuery}")
public void syncLinkState() {
if(enableSyncStationLink) {
int count = 0;
boolean b;
do {
b = dataSyncService.syncLinkState();
count++;
} while (!b && count <= syncStationLinkStateRetryCount); //如果同步失败,尝试重新调用接口,但次数不超过3次
log.info("基站链路状态数据同步{},共调用了{}次webservce接口", b ? "成功" : "失败", count);
}
}
/**
* 同步基站呼叫状态数据
*/
@Scheduled(cron = "${webservice.frequency-currBsCallQuery}")
public void syncCallState() {
if(enableSyncStationCall) {
int count = 0;
boolean b;
do {
b = dataSyncService.syncCallState();
count++;
} while (!b && count <= syncStationCallStateRetryCount); //如果同步失败,尝试重新调用接口,但次数不超过3次
log.info("基站链路状态数据同步{},共调用了{}次webservce接口", b ? "成功" : "失败", count);
}
}
}
二、各个定时任务的实现
package com.sf.map.tool.station.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.sf.map.tool.station.DeviceTypeEnum;
import com.sf.map.tool.station.ErrorCodeEnum;
import com.sf.map.tool.station.entity.Station;
import com.sf.map.tool.station.entity.StationCallState;
import com.sf.map.tool.station.entity.StationLinkState;
import com.sf.map.tool.station.service.DataSyncService;
import com.sf.map.tool.station.service.DeviceService;
import com.sf.map.tool.station.util.AxisUtil;
import com.sf.map.tool.station.vo.DeviceDetailVo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author 80004819
* @ClassName:
* @Description:
* @date 2020年07月22日 11:49:30
*/
@Service
@Slf4j
public class DataSyncServiceImpl implements DataSyncService {
@Autowired
private DeviceService deviceService;
@Value("${webservice.targetNamespace}")
private String targetNamespace;
/**
* 基础数据-endpoint
*/
@Value("${webservice.endpoint-cityTopService}")
private String endpoint_cityTopService;
/**
* #基础数据-method-设备拓扑信息查询操作
*/
@Value("${webservice.method-topQuery}")
private String method_topQuery;
/**
* 系统实时监控数据-endpoint
*/
@Value("${webservice.endpoint-customMonitorService}")
private String endpoint_customMonitorService;
/**
* 系统实时监控数据-method-当前基站链路状态查询
*/
@Value("${webservice.method-currLineStateQuery}")
private String method_currLineStateQuery;
/**
* 系统实时监控数据-method-当前基站呼叫状态查询
*/
@Value("${webservice.method-currBsCallQuery}")
private String method_currBsCallQuery;
/**
* 同步基站基础数据,线程池相关常量及变量
*/
//批处理任务阀值: 每两百条数据开启一个线程
private static final int COUNT = 200;
//最大允许同一时刻同时执行批次任务的线程数量
private static final int MAX_THREAD_COUNT = 30;
//最多允许同时存在正在进行批处理操作的线程数量
private static final Semaphore SEMAPHORE = new Semaphore(MAX_THREAD_COUNT);
//执行全部批次任务需要开启的线程数量
private static int THREAD_COUNT = 0;
//线程任务计数器
private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(0);
//缓存线程池
private static ExecutorService EXECUTOR_SERVICE = null;
/**
* 同步基站链路状态数据,线程池相关常量及变量
*/
//批处理任务阀值: 每两百条数据开启一个线程
private static final int LINK_STATE_COUNT = 200;
//最大允许同一时刻同时执行批次任务的线程数量
private static final int LINK_STATE_MAX_THREAD_COUNT = 30;
//最多允许同时存在正在进行批处理操作的线程数量
private static final Semaphore LINK_STATE_SEMAPHORE = new Semaphore(LINK_STATE_MAX_THREAD_COUNT);
//执行全部批次任务需要开启的线程数量
private static int LINK_STATE_THREAD_COUNT = 0;
//线程任务计数器
private static CountDownLatch LINK_STATE_COUNT_DOWN_LATCH = new CountDownLatch(0);
//缓存线程池
private static ExecutorService LINK_STATE_EXECUTOR_SERVICE = null;
/**
* 同步基站呼叫状态数据,线程池相关常量及变量
*/
//批处理任务阀值: 每两百条数据开启一个线程
private static final int CALL_STATE_COUNT = 200;
//最大允许同一时刻同时执行批次任务的线程数量
private static final int CALL_STATE_MAX_THREAD_COUNT = 30;
//最多允许同时存在正在进行批处理操作的线程数量
private static final Semaphore CALL_STATE_SEMAPHORE = new Semaphore(CALL_STATE_MAX_THREAD_COUNT);
//执行全部批次任务需要开启的线程数量
private static int CALL_STATE_THREAD_COUNT = 0;
//线程任务计数器
private static CountDownLatch CALL_STATE_COUNT_DOWN_LATCH = new CountDownLatch(0);
//缓存线程池
private static ExecutorService CALL_STATE_EXECUTOR_SERVICE = null;
/**
* 同步基站基础信息数据
*
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public boolean syncStationData() {
try {
if (EXECUTOR_SERVICE == null || EXECUTOR_SERVICE.isTerminated()) {
EXECUTOR_SERVICE = Executors.newFixedThreadPool(MAX_THREAD_COUNT);
}
List<DeviceDetailVo> deviceDetailVoList = new ArrayList<DeviceDetailVo>();
//调用webservice接口
// Object result = AxisUtil.callWebService(endpoint_cityTopService, targetNamespace, method_topQuery, null, new String[]{"input"}, new String[]{""});
Object result = AxisUtil.callWebServiceBySoap(endpoint_cityTopService,
targetNamespace, method_topQuery, null,
"input", "", "topList");
if (result != null && result instanceof String) {
String res = result.toString();
// String res = "{\"topList\":[{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":0,\"systemModel\":1,\"bandwidth\":2,\"devType\":2,\"netId\":\"15.1.30\",\"longitude\":0,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"\",\"devState\":0,\"diversity\":1,\"opDate\":\"2020-07-21\",\"antAltitude\":0,\"tsAddress\":\"\",\"linkType\":1,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁市公安局350兆PDT集群系统\",\"antType\":\"004\",\"coverArea\":0,\"bsrList\":[],\"roomType\":3,\"manufacturer\":\"006\",\"antNum\":0,\"latitude\":0,\"devId\":\"m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":157,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.329444,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-20\",\"antAltitude\":45,\"tsAddress\":\"咸宁温泉市电信公司大楼\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉市电信公司5\",\"antType\":\"001\",\"coverArea\":6.5,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.841944,\"devId\":\"r005.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":66,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.305555,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-19\",\"antAltitude\":46,\"tsAddress\":\"咸宁市咸安区公安分局大楼\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁咸安区分局基站4\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.874444,\"devId\":\"r004.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":72,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.280833,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-19\",\"antAltitude\":45,\"tsAddress\":\"咸宁市咸宁区福林天下\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁咸安福林天下基站3\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.874444,\"devId\":\"r003.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":65,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.285277,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-18\",\"antAltitude\":40,\"tsAddress\":\"咸宁温泉区十号桥派出所\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉十号桥基站2\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.863888,\"devId\":\"r002.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":259,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.318055,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-18\",\"antAltitude\":85,\"tsAddress\":\"咸宁潜山广电\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉潜山基站1\",\"antType\":\"001\",\"coverArea\":8,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.811944,\"devId\":\"r001.m715.hb.pdt.cn\"},\n" +
// "{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":0,\"systemModel\":1,\"bandwidth\":2,\"devType\":2,\"netId\":\"15.1.30\",\"longitude\":0,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"\",\"devState\":0,\"diversity\":1,\"opDate\":\"2020-07-21\",\"antAltitude\":0,\"tsAddress\":\"\",\"linkType\":1,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁市公安局350兆PDT集群系统\",\"antType\":\"004\",\"coverArea\":0,\"bsrList\":[],\"roomType\":3,\"manufacturer\":\"006\",\"antNum\":0,\"latitude\":0,\"devId\":\"m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":157,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.329444,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-20\",\"antAltitude\":45,\"tsAddress\":\"咸宁温泉市电信公司大楼\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉市电信公司5\",\"antType\":\"001\",\"coverArea\":6.5,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.841944,\"devId\":\"r005.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":66,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.305555,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-19\",\"antAltitude\":46,\"tsAddress\":\"咸宁市咸安区公安分局大楼\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁咸安区分局基站4\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.874444,\"devId\":\"r004.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":72,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.280833,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-19\",\"antAltitude\":45,\"tsAddress\":\"咸宁市咸宁区福林天下\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁咸安福林天下基站3\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.874444,\"devId\":\"r003.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":65,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.285277,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-18\",\"antAltitude\":40,\"tsAddress\":\"咸宁温泉区十号桥派出所\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉十号桥基站2\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.863888,\"devId\":\"r002.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":259,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.318055,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-18\",\"antAltitude\":85,\"tsAddress\":\"咸宁潜山广电\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉潜山基站1\",\"antType\":\"001\",\"coverArea\":8,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.811944,\"devId\":\"r001.m715.hb.pdt.cn\"},\n" +
// "{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":0,\"systemModel\":1,\"bandwidth\":2,\"devType\":2,\"netId\":\"15.1.30\",\"longitude\":0,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"\",\"devState\":0,\"diversity\":1,\"opDate\":\"2020-07-21\",\"antAltitude\":0,\"tsAddress\":\"\",\"linkType\":1,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁市公安局350兆PDT集群系统\",\"antType\":\"004\",\"coverArea\":0,\"bsrList\":[],\"roomType\":3,\"manufacturer\":\"006\",\"antNum\":0,\"latitude\":0,\"devId\":\"m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":157,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.329444,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-20\",\"antAltitude\":45,\"tsAddress\":\"咸宁温泉市电信公司大楼\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉市电信公司5\",\"antType\":\"001\",\"coverArea\":6.5,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.841944,\"devId\":\"r005.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":66,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.305555,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-19\",\"antAltitude\":46,\"tsAddress\":\"咸宁市咸安区公安分局大楼\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁咸安区分局基站4\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.874444,\"devId\":\"r004.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":72,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.280833,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-19\",\"antAltitude\":45,\"tsAddress\":\"咸宁市咸宁区福林天下\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁咸安福林天下基站3\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.874444,\"devId\":\"r003.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":65,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.285277,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-18\",\"antAltitude\":40,\"tsAddress\":\"咸宁温泉区十号桥派出所\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉十号桥基站2\",\"antType\":\"001\",\"coverArea\":6,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.863888,\"devId\":\"r002.m715.hb.pdt.cn\"},{\"provinceId\":\"hb\",\"cityId\":\"715\",\"altitude\":259,\"systemModel\":1,\"bandwidth\":2,\"devType\":1,\"longitude\":114.318055,\"systemId\":\"m715.hb.pdt.cn\",\"district\":\"咸安区\",\"devState\":0,\"diversity\":1,\"opDate\":\"2017-10-18\",\"antAltitude\":85,\"tsAddress\":\"咸宁潜山广电\",\"linkType\":2,\"roomPower\":0,\"roomStatus\":0,\"devName\":\"咸宁温泉潜山基站1\",\"antType\":\"001\",\"coverArea\":8,\"bsrList\":[],\"roomType\":1,\"manufacturer\":\"006\",\"antNum\":2,\"latitude\":29.811944,\"devId\":\"r001.m715.hb.pdt.cn\"}]}";
JSONObject jsonObject = JSON.parseObject(res);
if (jsonObject.containsKey("topList")) { //接口调用成功
//TODO
JSONArray topList = jsonObject.getJSONArray("topList");
if (!topList.isEmpty()) { //请求成功并且有数据
List<Station> stations = topList.toJavaList(Station.class);
deviceDetailVoList.addAll(stations.parallelStream().filter(x -> x.getDevType().compareTo(1) == 0).map(this::covertDeviceDetailVo).collect(Collectors.toList()));
if (!CollectionUtils.isEmpty(deviceDetailVoList)) {
//删除所有基站数据
int deleteResult = deviceService.deleteByType(DeviceTypeEnum.STATION.getType());
if (deleteResult >= 0) { //删除基站数据成功
int size = deviceDetailVoList.size();
log.info("开始同步基站数据,本地共需同步{}条数据", deviceDetailVoList.size());
//计算需要开启的线程数
THREAD_COUNT = size % COUNT == 0 ? size / COUNT : size / COUNT + 1;
COUNT_DOWN_LATCH = new CountDownLatch(THREAD_COUNT);
List<DeviceDetailVo> taskList = new ArrayList<DeviceDetailVo>();
int threadId = 1;
for (DeviceDetailVo vo : deviceDetailVoList) {
taskList.add(vo);
if (taskList.size() >= COUNT) { //满200条开一个线程
List<DeviceDetailVo> temp = new ArrayList<>(taskList);
EXECUTOR_SERVICE.execute(new taskRunner(threadId, temp));
taskList.clear();
threadId++;
}
}
if (taskList.size() > 0) {
List<DeviceDetailVo> temp = new ArrayList<>(taskList);
EXECUTOR_SERVICE.execute(new taskRunner(threadId, temp));
taskList.clear();
}
if (COUNT_DOWN_LATCH != null) {
//最多等待一个小时
COUNT_DOWN_LATCH.await(1, TimeUnit.HOURS);
}
log.info("countdownlatch的值是:" + COUNT_DOWN_LATCH.getCount());
if (COUNT_DOWN_LATCH.getCount() == 0 && EXECUTOR_SERVICE != null) {
EXECUTOR_SERVICE.shutdown();
}
log.info("基站数据同步成功,本次共同步了{}条数据", deviceDetailVoList.size());
return true;
}
}
}
} else if (jsonObject.containsKey("faultInfo")) {
log.info("基站数据同步失败,失败原因:{}", ErrorCodeEnum.errorCodeMAP.get(Integer.parseInt(jsonObject.get("faultInfo").toString())));
return false;
} else {
log.info("基站数据同步失败,未知错误");
return false;
}
}
} catch (Exception e) {
log.info("基站数据同步异常", e);
return false;
}
log.info("基站数据同步失败,未知错误");
return false;
}
public DeviceDetailVo covertDeviceDetailVo(Station station) {
DeviceDetailVo deviceDetailVo = new DeviceDetailVo();
deviceDetailVo.setType(DeviceTypeEnum.STATION.getType());//基站
deviceDetailVo.setName(station.getDevName());
deviceDetailVo.setCompany(station.getManufacturer());
deviceDetailVo.setDeviceId(station.getDevId());
deviceDetailVo.setIsEnable(station.getDevState());
deviceDetailVo.setX(station.getLongitude().floatValue());
deviceDetailVo.setY(station.getLatitude().floatValue());
deviceDetailVo.setAltitude(station.getAltitude().intValue());
deviceDetailVo.setCoverArea(BigDecimal.valueOf(station.getCoverArea()));
LocalDate localDate = LocalDate.parse(station.getOpDate());
deviceDetailVo.setOpDate(LocalDateTime.of(localDate, LocalTime.MIN));
deviceDetailVo.setNetId(station.getNetId());
deviceDetailVo.setAddress(station.getTsAddress());
return deviceDetailVo;
}
@Data
class taskRunner implements Runnable {
private int id;
private List<DeviceDetailVo> deviceDetailVos;
public taskRunner() {
}
public taskRunner(int id, List<DeviceDetailVo> deviceDetailVos) {
this.id = id;
this.deviceDetailVos = deviceDetailVos;
}
@Override
public void run() {
try {
SEMAPHORE.acquire();
int batchInsertResult = deviceService.batchInsert(this.deviceDetailVos);
if (batchInsertResult > 0) {
log.info("线程{}批次执行完成,共插入{}条基站数据", this.id, this.deviceDetailVos.size());
}
} catch (Exception e) {
log.error("线程{}批次任务执行异常", this.id, e);
} finally {
SEMAPHORE.release();
COUNT_DOWN_LATCH.countDown();
}
}
}
/**
* 同步基站链路状态信息
*
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public boolean syncLinkState() {
try {
if (LINK_STATE_EXECUTOR_SERVICE == null || LINK_STATE_EXECUTOR_SERVICE.isTerminated()) {
LINK_STATE_EXECUTOR_SERVICE = Executors.newFixedThreadPool(LINK_STATE_MAX_THREAD_COUNT);
}
//调webservice接口获取到基站链路状态数据
//Object result = AxisUtil.callWebService(endpoint_customMonitorService, targetNamespace, method_currLineStateQuery, null, new String[]{"input"}, new String[]{""});
Object result = AxisUtil.callWebServiceBySoap(endpoint_customMonitorService,
targetNamespace,
method_currLineStateQuery, null,
"input", "", "ns2:currLineStateList");
if (result != null && result instanceof String) {
String res = result.toString();
// String res = "{\"StationLinkState\":[{\n" +
// " \"netID\":\"15.1.30\",\n" +
// " \"state\":\"1\"\n" +
// "},{\n" +
// " \"netID\":\"15.1.31\",\n" +
// " \"state\":\"1\"\n" +
// "},{\n" +
// " \"netID\":\"15.1.32\",\n" +
// " \"state\":\"1\"\n" +
// "}]}";
JSONObject jsonObject = JSON.parseObject(res);
if (jsonObject.containsKey("currLineStateList")) { //接口调用成功
//TODO
JSONArray topList = jsonObject.getJSONArray("currLineStateList");
List<StationLinkState> stations = topList.toJavaList(StationLinkState.class);
int size = stations.size();
log.info("本次需要更新{}类网络id的基站链路状态", size);
LINK_STATE_THREAD_COUNT = size % LINK_STATE_COUNT == 0 ? size / LINK_STATE_COUNT : size / LINK_STATE_COUNT + 1;
LINK_STATE_COUNT_DOWN_LATCH = new CountDownLatch(LINK_STATE_THREAD_COUNT);
List<DeviceDetailVo> taskList = stations.parallelStream().map(x -> {
DeviceDetailVo deviceDetailVo = new DeviceDetailVo();
deviceDetailVo.setNetId(x.getNetID());
deviceDetailVo.setLinkState(Integer.parseInt(x.getState()));
return deviceDetailVo;
}).collect(Collectors.toList());
int threadId = 1;
List<DeviceDetailVo> updateTaskList = new ArrayList<DeviceDetailVo>();
for (DeviceDetailVo deviceVo : taskList) {
updateTaskList.add(deviceVo);
if (updateTaskList.size() >= LINK_STATE_COUNT) { //满200条开一个线程
List<DeviceDetailVo> temp = new ArrayList<>(updateTaskList);
LINK_STATE_EXECUTOR_SERVICE.execute(new taskLinkStateRunner(threadId, temp));
updateTaskList.clear();
threadId++;
}
}
if (updateTaskList.size() > 0) {
List<DeviceDetailVo> temp = new ArrayList<>(updateTaskList);
LINK_STATE_EXECUTOR_SERVICE.execute(new taskLinkStateRunner(threadId, temp));
updateTaskList.clear();
}
if (LINK_STATE_COUNT_DOWN_LATCH != null) {
//最多等待一个小时
LINK_STATE_COUNT_DOWN_LATCH.await(1, TimeUnit.HOURS);
}
log.info("countdownlatch的值是:" + LINK_STATE_COUNT_DOWN_LATCH.getCount());
if (LINK_STATE_COUNT_DOWN_LATCH.getCount() == 0 && LINK_STATE_EXECUTOR_SERVICE != null) {
LINK_STATE_EXECUTOR_SERVICE.shutdown();
}
log.info("基站链路数据同步成功,本次共同步了{}类网络id的基站链路状态数据", size);
return true;
} else if (jsonObject.containsKey("faultInfo")) {
log.info("基站链路数据同步失败,失败原因:{}", ErrorCodeEnum.errorCodeMAP.get(Integer.parseInt(jsonObject.get("faultInfo").toString())));
return false;
} else {
log.info("基站链路数据同步失败,未知错误");
return false;
}
}
} catch (Exception e) {
log.info("基站链路数据同步异常", e);
return false;
}
log.info("基站链路数据同步失败,未知错误");
return false;
}
@Data
class taskLinkStateRunner implements Runnable {
private int id;
private List<DeviceDetailVo> deviceDetailVos;
public taskLinkStateRunner() {
}
public taskLinkStateRunner(int id, List<DeviceDetailVo> deviceDetailVos) {
this.id = id;
this.deviceDetailVos = deviceDetailVos;
}
@Override
public void run() {
try {
LINK_STATE_SEMAPHORE.acquire();
int batchUpdateLinkStateResult = deviceService.batchUpdateLinkState(this.deviceDetailVos);
if (batchUpdateLinkStateResult > 0) {
log.info("线程{}批次执行完成,共更新{}类基站链路状态数据", this.id, this.deviceDetailVos.size());
}
} catch (Exception e) {
log.error("线程{}批次任务执行异常", this.id, e);
} finally {
LINK_STATE_SEMAPHORE.release();
LINK_STATE_COUNT_DOWN_LATCH.countDown();
}
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean syncCallState() {
try {
if (CALL_STATE_EXECUTOR_SERVICE == null || CALL_STATE_EXECUTOR_SERVICE.isTerminated()) {
CALL_STATE_EXECUTOR_SERVICE = Executors.newFixedThreadPool(CALL_STATE_MAX_THREAD_COUNT);
}
//获取所有基站数据
List<DeviceDetailVo> taskList = deviceService.findDeviceByType(DeviceTypeEnum.STATION.getType());
if (!CollectionUtils.isEmpty(taskList)) {
int size = taskList.size();
CALL_STATE_THREAD_COUNT = size % CALL_STATE_COUNT == 0 ? size / CALL_STATE_COUNT : size / CALL_STATE_COUNT + 1;
CALL_STATE_COUNT_DOWN_LATCH = new CountDownLatch(CALL_STATE_THREAD_COUNT);
int threadId = 1;
List<DeviceDetailVo> updateTaskList = new ArrayList<DeviceDetailVo>();
for (DeviceDetailVo deviceVo : taskList) {
updateTaskList.add(deviceVo);
if (updateTaskList.size() >= CALL_STATE_COUNT) { //满200条开一个线程
List<DeviceDetailVo> temp = new ArrayList<>(updateTaskList);
CALL_STATE_EXECUTOR_SERVICE.execute(new taskCallStateRunner(threadId, temp));
updateTaskList.clear();
threadId++;
}
}
if (updateTaskList.size() > 0) {
List<DeviceDetailVo> temp = new ArrayList<>(updateTaskList);
CALL_STATE_EXECUTOR_SERVICE.execute(new taskCallStateRunner(threadId, temp));
updateTaskList.clear();
}
if (CALL_STATE_COUNT_DOWN_LATCH != null) {
//最多等待一个小时
CALL_STATE_COUNT_DOWN_LATCH.await(1, TimeUnit.HOURS);
}
log.info("countdownlatch的值是:" + CALL_STATE_COUNT_DOWN_LATCH.getCount());
if (CALL_STATE_COUNT_DOWN_LATCH.getCount() == 0 && CALL_STATE_EXECUTOR_SERVICE != null) {
CALL_STATE_EXECUTOR_SERVICE.shutdown();
}
log.info("基站呼叫状态数据同步成功,本次共同步了{}条基站呼叫状态数据数据", size);
return true;
} else {
log.info("基站呼叫状态数据同步成功,本次共同步了0条基站呼叫状态数据数据");
return true;
}
} catch (Exception e) {
log.info("基站呼叫状态数据同步异常", e);
return false;
}
}
private static final Gson GSON = new Gson();
@Data
class taskCallStateRunner implements Runnable {
private int id;
private List<DeviceDetailVo> deviceDetailVos;
public taskCallStateRunner() {
}
public taskCallStateRunner(int id, List<DeviceDetailVo> deviceDetailVos) {
this.id = id;
this.deviceDetailVos = deviceDetailVos;
}
@Override
public void run() {
try {
CALL_STATE_SEMAPHORE.acquire();
List<Map<String, String>> netIdList = deviceDetailVos.parallelStream().map(x -> {
Map<String, String> paramMap = new HashMap<String, String>();
paramMap.put("netid", x.getNetId());
return paramMap;
}).collect(Collectors.toList());
JSONObject paramJson = new JSONObject();
paramJson.put("idArray", netIdList);
String value = paramJson.toJSONString();
// Object result = AxisUtil.callWebService
// (endpoint_customMonitorService,
// targetNamespace,
// method_currBsCallQuery, null, new String[]{"idArray"}, new String[]{idArray});
Object result = AxisUtil.callWebServiceBySoap(endpoint_customMonitorService,targetNamespace,method_currBsCallQuery,null,"netId",value,"ns2:currBsCallList");
if (result != null && result instanceof String) {
List<StationCallState> stationCallStateList = new ArrayList<StationCallState>();
// String res = "{\"currBsCallList\":[{\n" +
// " \"netID\":\"15.1.30\",\n" +
// " \"slotUsedCnt\":\"0\",\n" +
// " \"devId\":\"r001.m715.hb.pdt.cn\"\n" +
// "},{\n" +
// " \"netID\":\"15.1.31\",\n" +
// " \"slotUsedCnt\":\"1\",\n" +
// " \"devId\":\"r002.m715.hb.pdt.cn\"\n" +
// "},{\n" +
// " \"netID\":\"15.1.32\",\n" +
// " \"slotUsedCnt\":\"2\",\n" +
// " \"devId\":\"r003.m715.hb.pdt.cn\"\n" +
// "}]}";
String res = result.toString();
JSONObject jsonObject = JSON.parseObject(res);
if (jsonObject.containsKey("currBsCallList")) { //接口调用成功
//TODO
JSONArray currBsCallList = jsonObject.getJSONArray("currBsCallList");
stationCallStateList = currBsCallList.toJavaList(StationCallState.class);
} else if (jsonObject.containsKey("faultInfo")) {
log.info("线程{}基站呼叫数据同步失败,失败原因:{}", this.id, ErrorCodeEnum.errorCodeMAP.get(Integer.parseInt(jsonObject.get("faultInfo").toString())));
} else {
log.info("线程{}基站呼叫数据同步失败,未知错误", this.id);
}
if (!CollectionUtils.isEmpty(stationCallStateList)) {
int batchUpdateLinkStateResult =
deviceService.batchUpdateCallState(stationCallStateList);
if (batchUpdateLinkStateResult > 0) {
log.info("线程{}批次执行完成,共更新{}条基站呼叫状态数据", this.id, stationCallStateList.size());
}
}
}
} catch (Exception e) {
log.error("线程{}批次任务执行异常", this.id, e);
} finally {
CALL_STATE_SEMAPHORE.release();
CALL_STATE_COUNT_DOWN_LATCH.countDown();
}
}
}
}
三、配置文件及启用定时任务
application.yml
webservice:
address: 10.76.65.12:9000
targetNamespace: http://nm.pdt.org.cn
#基础数据
service-cityTopService: CityTopService
#基础数据-设备拓扑信息查询操作
method-topQuery: topQuery
frequency-topQuery: 0 0 1 * * ?
# frequency-topQuery: 0/5 * * * * ?
retry-count-topQuery: 2
enable-topQuery: true
endpoint-cityTopService: http://${webservice.address}/victel/${webservice.service-cityTopService}?WSDL
#系统实时监控数据
service-customMonitorService: CustomMonitorService
#系统实时监控数据-当前基站链路状态查询
method-currLineStateQuery: currLineStateQuery
frequency-currLineStateQuery: 0 0/5 * * * ?
retry-count-currLineStateQuery: 2
enable-currLineStateQuery: true
#系统实时监控数据-当前基站呼叫状态查询
method-currBsCallQuery: currBsCallQuery
frequency-currBsCallQuery: 0 0/5 * * * ?
# frequency-currBsCallQuery: 0/5 * * * * ?
retry-count-currBsCallQuery: 2
enable-currBsCallQuery: true
endpoint-customMonitorService: http://${webservice.address}/victel/${webservice.service-customMonitorService}?WSDL
启动类添加注解
@EnableScheduling
@Configuration
@EnableCaching
@EnableTransactionManagement
@SpringBootApplication(scanBasePackages = {"com.sf.map"})
@EnableSwagger2
@MapperScan(value={"com.sf.map.**.mapper*"})
public class MainWebApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(MainWebApplication.class, args);
}
}
四、mapper、service、xml相关
DeviceMapper
package com.sf.map.tool.station.mapper;
import com.sf.map.tool.station.entity.StationCallState;
import com.sf.map.tool.station.vo.DeviceDetailVo;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface DeviceMapper {
List<DeviceDetailVo> findAllDevice();
int deleteByType(@Param("type") String type);
int batchInsert(@Param("list") List<DeviceDetailVo> list);
List<DeviceDetailVo> findDeviceByType(@Param("type") String type);
int batchUpdateLinkState(@Param("list") List<DeviceDetailVo> list);
int batchUpdateCallState(@Param("list") List<StationCallState> list);
}
DeviceService
package com.sf.map.tool.station.service;
import com.sf.map.tool.station.entity.StationCallState;
import com.sf.map.tool.station.vo.DeviceDetailVo;
import java.util.List;
public interface DeviceService {
List<DeviceDetailVo> findAllDevice();
int deleteByType(String type);
int batchInsert(List<DeviceDetailVo> list);
List<DeviceDetailVo> findDeviceByType(String type);
int batchUpdateLinkState(List<DeviceDetailVo> deviceDetailVos);
int batchUpdateCallState(List<StationCallState> stationCallStateList);
}
DeviceServiceImpl
package com.sf.map.tool.station.service.impl;
import com.sf.map.tool.station.entity.StationCallState;
import com.sf.map.tool.station.mapper.DeviceMapper;
import com.sf.map.tool.station.service.DeviceService;
import com.sf.map.tool.station.util.CoordTransform2;
import com.sf.map.tool.station.vo.DeviceDetailVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author DuanZhaoXu
* @ClassName:
* @Description:
* @date 2020年05月09日 10:50:39
*/
@Service
public class DeviceServiceImpl implements DeviceService {
@Autowired
private DeviceMapper deviceMapper;
@Override
public List<DeviceDetailVo> findAllDevice() {
List<DeviceDetailVo> deviceDetailVoList = deviceMapper.findAllDevice();
deviceDetailVoList = deviceDetailVoList.parallelStream().map(this::convertXYSchema).collect(Collectors.toList());
return deviceDetailVoList;
}
/**
* 讲设备的坐标按照目标schema转换坐标。
*
* @param detailVo
* @return
*/
private DeviceDetailVo convertXYSchema(DeviceDetailVo detailVo) {
Double[] xyArray = CoordTransform2.convertSchema
(CoordTransform2.Schema.gps2mars, detailVo.getX().doubleValue(), detailVo.getY().doubleValue());
detailVo.setX(xyArray[0].floatValue());
detailVo.setY(xyArray[1].floatValue());
return detailVo;
}
@Override
public int deleteByType(String type) {
return deviceMapper.deleteByType(type);
}
@Override
public int batchInsert(List<DeviceDetailVo> list) {
return deviceMapper.batchInsert(list);
}
@Override
public List<DeviceDetailVo> findDeviceByType(String type) {
return deviceMapper.findDeviceByType(type);
}
@Override
public int batchUpdateLinkState(List<DeviceDetailVo> deviceDetailVos) {
return deviceMapper.batchUpdateLinkState(deviceDetailVos);
}
@Override
public int batchUpdateCallState(List<StationCallState> stationCallStateList) {
return deviceMapper.batchUpdateCallState(stationCallStateList);
}
}