JOB:
/** * 同步警情json 到t_alarm。如果服务器down掉几天,会遗漏期间的数据 * @author wj * @date 2016-12-30 * */ public class AlarmJob implements Job { public static AtomicReference<Date> maxBjsj ; static{ if(CacheUtils.get(Contants.jq_maxDateCache, Contants.jq_maxDate_key) == null){ maxBjsj = new AtomicReference<Date>(); }else{ maxBjsj = (AtomicReference<Date>)CacheUtils.get(Contants.jq_maxDateCache, Contants.jq_maxDate_key); } } public static int count = 0; //可以缓存到内存 // @Cacheable(value="sysCache" ) @Override public void execute(JobExecutionContext context) throws JobExecutionException { String url = Contants.alarm_url+DateUtils.getDate("yyyyMMdd"); try { long t1 = System.currentTimeMillis(); // 排序前取得当前时间 if(new HttpClient().get(url,maxBjsj,Contants.TYPE_JQ)){ long t2 = System.currentTimeMillis(); // 排序后取得当前时间 Calendar c = Calendar.getInstance(); c.setTimeInMillis(t2 - t1); System.out.println("----第"+ ++count +"次 同步结束----"+"耗时: " + c.get(Calendar.MINUTE) + "分 " + c.get(Calendar.SECOND) + "秒 " + c.get(Calendar.MILLISECOND) + " 毫秒"); } // } catch (Exception e) { e.printStackTrace(); } } }
httpclient:
public boolean get(String url ,AtomicReference<Date> maxBjsj,String type) throws Exception{ CloseableHttpClient httpclient = HttpClients.createDefault(); try { // 创建httpget. HttpGet httpget = new HttpGet(url); // httpget.setParams(params); System.out.println("\n-----------------------"); logger.debug("executing request " + httpget.getURI()); // 执行get请求. CloseableHttpResponse response = httpclient.execute(httpget); try { // 获取响应实体 HttpEntity entity = response.getEntity(); // 打印响应状态 logger.info(response.getStatusLine().toString()); if(200==(response.getStatusLine().getStatusCode())){ if (entity != null) { String s = EntityUtils.toString(entity, "UTF-8"); // 打印响应内容长度 logger.debug("Response content length: " + entity.getContentLength()); // 打印响应内容 logger.debug("Response content: " + s); Map<String,List<Map<String,Object> >> map = FastJsonUtil.stringToCollect(s); List<Map<String,Object> > data = map.get("data"); data = getCurrentDateData(data, type); if( maxBjsj.get()==null ){ //只会执行第一次,如果缓存里没有值 // maxBjsj = new AtomicReference<Date>(getBJSJDate(data.get(0))) ; maxBjsj.set(getBJSJDate(data.get(0),type)); }else{ data = filterDateGt(maxBjsj.get(), data,type); } List<Map<String,Object> > listWithoutDup = data; String jobName = ""; String cacheName = ""; String cacheKeyName = ""; if(Contants.TYPE_JQ.equals(type)){ //去重 ,获取下一次最大日期 listWithoutDup = withoutDuplicate(data, "JQBH",maxBjsj,type); //警情去重 jobName="警情"; cacheName=Contants.jq_maxDateCache; cacheKeyName=Contants.jq_maxDate_key; }else if(Contants.TYPE_DPZD.equals(type)){ //没有主键,无法去重 setMaxDate(data, maxBjsj,type); jobName="调派中队"; cacheName=Contants.dpzd_maxDateCache; cacheKeyName=Contants.dpzd_maxDate_key; }else if(Contants.TYPE_DPCL.equals(type)){ setMaxDate(data, maxBjsj,type); jobName="调派车辆"; cacheName=Contants.dpcl_maxDateCache; cacheKeyName=Contants.dpcl_maxDate_key; }else{ throw new RuntimeException("不应该到这"); } boolean ret = true; if(!CollectionUtils.isEmpty(listWithoutDup)){ for(Map<String,Object> cur: listWithoutDup){ try{ BeanMapper.setAndPersist(cur,type); //如果这里报错,因为已经在上面有了最大日期,下一次JOB、数据就为空除非有新数据 }catch (org.springframework.dao.DuplicateKeyException e){ // 过滤掉之前日期,不应该到这 // maxBjsj.set(null); //清空最大日期,让下一次JOB继续插入(下一次数据也许会变) logger.error(e.getMessage()+"--duplicate: "+(String)cur.get("JQBH")); ret = false; } } if(ret) logger.info("-------------【"+jobName+"】成功导入"+listWithoutDup.size() +"条数据-----------------------"); }else{ logger.info("-------------【"+jobName+"】没有最新数据-----------------------"); ret= false; } // 可以比较一下缓存里的日期和这里的最大日期。不如直接put;比较还得get再比较,然后再put //将最大日期缓存磁盘(用于防止server关闭)。如果没有关闭,最大日期值 依然在外面静态变量里 CacheUtils.put(cacheName, cacheKeyName,maxBjsj); //看看有没有持久化成功 org.springframework.util.Assert.isTrue(comparDate(((AtomicReference<Date>)CacheUtils.get(cacheName, cacheKeyName)).get(), maxBjsj.get())==0 ,"缓存日期和刚才日应该相等"); CacheUtils.flush(cacheName); return ret; } }else { logger.warn("------请求["+url+"]错误:"+response.getStatusLine().toString());// HTTP/1.1 404 Not Found return false; } } finally { response.close(); } } catch (ClientProtocolException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }catch (SQLIntegrityConstraintViolationException e) { logger.error("--------------主键冲突异常:"+e.getMessage()+"-----------------------"); } finally { // 关闭连接,释放资源 try { httpclient.close(); } catch (IOException e) { e.printStackTrace(); } } return false; }
其他私有辅助方法:
/** * 去重,如果重复,只保留第一个。并获得最大日期 * @author wj * @param list * @param key 要去重的map的key * @return */ private List<Map<String,Object>> withoutDuplicate(List<Map<String,Object>> list,String key,AtomicReference<Date> maxBjsj,String type){ if(!CollectionUtils.isEmpty(list)){ List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>(); Set<Object> keysSet = new HashSet<Object>(); for(Map<String,Object> map : list){ Object keys = map.get(key); int beforeSize = keysSet.size(); keysSet.add(keys); int afterSize = keysSet.size(); if(afterSize == beforeSize + 1){ tmpList.add(map); Date date = getBJSJDate(map,type); if(comparDate(date, maxBjsj.get())==1){ // if(date>maxBjsj){ // maxBjsj = new AtomicReference<Date>(date) ; maxBjsj.set(date); } } } return tmpList; } return null; } private void setMaxDate(List<Map<String,Object>> list,AtomicReference<Date> maxBjsj,String type){ if(!CollectionUtils.isEmpty(list)){ for(Map<String,Object> map : list){ Date date = getBJSJDate(map,type); if(comparDate(date, maxBjsj.get())==1){ maxBjsj.set(date); } } } } /** * 查询日期大于{@code date} 的数据 * @param date * @param list * @return */ private List<Map<String,Object>> filterDateGt(Date date ,List<Map<String,Object>> list,String type){ if(!CollectionUtils.isEmpty(list)){ List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>(); for(Map<String,Object> map : list){ Date date1 = getBJSJDate(map,type); if(comparDate(date1, date) == 1){ tmpList.add(map); } } return tmpList; } return null; } private Date getBJSJDate(Map<String,Object> map,String type){ String BJSJ =""; if(Contants.TYPE_JQ.equals(type)){ BJSJ = (String)map.get("BJSJ");//报警时间 --是否对应到 ALARM_TIME }else if(Contants.TYPE_DPZD.equals(type)){ BJSJ = (String)map.get("DPSJ"); }else if(Contants.TYPE_DPCL.equals(type)){// BJSJ = (String)map.get("DPSJ"); }else{ throw new RuntimeException("不应该到这"); } try { Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS");//2016/12/29 13:56:22.000 return BJSJDate; } catch (java.text.ParseException e) { e.printStackTrace(); } return null; } public static int comparDate(Date DATE1, Date DATE2) { try { if(DATE1 ==null && DATE2==null){ return 0; }else if(DATE1 ==null || DATE2==null){ throw new Exception(); } if (DATE1.getTime() > DATE2.getTime()) { // System.out.println("dt1 在dt2前"); return 1; } else if (DATE1.getTime() < DATE2.getTime()) { // System.out.println("dt1在dt2后"); return -1; } else { return 0; } } catch (Exception exception) { exception.printStackTrace(); } return 0; } public static List<Map<String,Object> > getCurrentDateData(List<Map<String,Object> > list,String type) throws java.text.ParseException{ if(!CollectionUtils.isEmpty(list)){ List<Map<String,Object>> tmpList=new ArrayList<Map<String,Object>>(); for(Map<String,Object> map : list){ if(Contants.TYPE_JQ.equals(type)){ String BJSJ = (String)map.get("BJSJ");//报警时间 --是否对应到 ALARM_TIME Date BJSJDate = DateUtils.parseDate(BJSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000 BJSJ = DateUtils.formatDate(BJSJDate); if(BJSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){ tmpList.add(map); } }else if(Contants.TYPE_DPZD.equals(type)){ String DPSJ = (String)map.get("DPSJ"); Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2016/12/29 13:56:22.000 DPSJ = DateUtils.formatDate(DPSJDate); if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){ tmpList.add(map); } }else if(Contants.TYPE_DPCL.equals(type)){ String DPSJ = (String)map.get("DPSJ"); Date DPSJDate = DateUtils.parseDate(DPSJ, "yyyy/MM/dd HH:mm:ss.SSS"); //2017/01/01 08:44:31.000 DPSJ = DateUtils.formatDate(DPSJDate); if(DPSJ.equals(DateUtils.getDate("yyyy-MM-dd"))){ tmpList.add(map); } }else{ throw new RuntimeException("不应该到这"); } } return tmpList; } return null; }
用到ehcache缓存最大时间到磁盘,缓存可以看我另一篇博客