java并发编程-Executor框架
http://blog.csdn.net/shoubuliaolebu/article/details/7441811
Thread_跨节点会合查询
http://www.myexception.cn/program/1057745.html
mysql 索引
http://www.cnblogs.com/tianhuilove/archive/2011/09/05/2167795.html
c3p0
http://blog.sina.com.cn/s/blog_6c5f4d3c01012gtq.html
ForkJoinPool VS ExecutorService 实例分析
www.iteye.com/topic/1117483
差不多每1万条插入一次数据,性能最高,40秒左右。
http://hi.baidu.com/yiewgckawnlpuwe/item/80adbb9d5c263eb282d29571
MySQL五种优化插入数据表的查询方法 by cubeking
http://hi.baidu.com/cubeking/item/598abdc08b3b9225ef46651b
mysql批量提交的优化
http://hidba.org/?p=369
类 ThreadPoolExecutor
http://www.cjsdn.net/Doc/JDK50/java/util/concurrent/ThreadPoolExecutor.html
ExecutorService+FutureTask实现程序执行超时监控
http://blog.csdn.net/navy0418/article/details/6612105
JAVA多线程的控制JAVA 5.0
http://blog.sina.com.cn/s/blog_5efa347301011zk3.html
http://v.youku.com/v_show/id_XNTA5ODk4NzQ0.html
C3p0使用时出现的异常及解决方案
http://blog.csdn.net/t12x3456/article/details/7650404
c3p0存在严重bug
http://weifly.iteye.com/blog/1227182
C3P0 bug 解决办法: 关闭的连接 still in use
http://bbs.csdn.net/topics/390290277
基于多线程的数据库连接池
http://blog.csdn.net/lihao8023/article/details/4237334
快速批量插入setLocalInfileInputStream的用法
http://blog.csdn.net/nsrainbow/article/details/8206050
MySQL性能优化的最佳21条经验
http://www.csdn.net/article/2011-08-09/302869
mysql快速插入/更新大量记录
http://www.cnblogs.com/chutianyao/archive/2012/07/18/2597330.html
数据库插入百万数据
http://blog.csdn.net/godfrey90/article/details/6534980
TYPE_SCROLL_INSENSITIVE:结果集的游标可以上下移动,当数据库变化时,当前结果集不变。
ResultSet.CONCUR_READ_ONLY:指定不可以更新ResultSet
以上两个参数是在对数据库进行分页处理时用到的。
分页经验:
分页表搞起来有些麻烦,DBA也提供了很多方法,找了个办法,比较符合我的业务要求,排序取
第一次: select userid from table order by userid asc limit 10000;
下一页:这里假设上一页的第1W 的userid为504010001,如下写
select userid from table where userid>504010001 order by userid asc limit 10000;
每次传入上次最大的一个userid,这样速度就很快了,数据多大都没问题
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * 多线长异步插入 */ @Component("cacheToMysqlTaskObj") public class CacheToMysqlTask{ Logger logger = LoggerFactory.getLogger(CacheToMysqlTask.class); @Resource private BarBatchDAO barBatchDAO; private static int initThreads = 10; private ExecutorService executorService; private CompletionService<Object> completionService; //每次执行的记录 private final static long exeStep = 10000; //任务未完成,休停时间 private final static long timeStep = 1*1000; //休停超时 private final static long stopStep = 1*60*60*1000; //递归超时 hours private final static int totalStep = 4; //1000W private final static int overRecords = 10000000; private final static int preFixLen = Constants.TAIR_FOOTBAR_VIEW.length(); public void exeToMysqlTask() { try{ logger.warn("import tair data to Mysql ..."); this.exeToMysqlTask(1, 10000, overRecords); logger.warn("over ..."); if(executorService != null && !executorService.isShutdown()){ executorService.shutdown(); logger.warn("executorService shutdown ..."); } }catch(Exception e){ logger.error("footbar mysql to tair failed:"+e); } } long totalTimes = 0L; public boolean exeToMysqlTask(long start,long end,long overRecords){ try{ long startTime = System.currentTimeMillis(); if(executorService == null || executorService.isShutdown()){ executorService = Executors.newFixedThreadPool(initThreads); completionService = new ExecutorCompletionService<Object>(executorService); } long kipStart = start; long kipEnd = end; long kipMax = overRecords; if(kipEnd<=0 || kipStart>kipEnd){ logger.error("执行的次数不对!"); return false; } if(kipEnd >= 8000000){ if(kipEnd%100000 == 0){ logger.warn("数据告警太大建议先拆分再操作!"); } } if(kipEnd > kipMax){ logger.warn("需处理数据 "+kipEnd+" 数据太大1000W以后的数据任务将不执行!"); return false; } for(int i=0;i<15;i++){ List<FootInfo> saveOrUpdateList = new ArrayList<FootInfo>(); for(;;){ //构造业务数据 saveOrUpdateList.add(footInfo); } if(i==10){ logger.error(i+"业务数据构造循环结束"); } if(saveOrUpdateList.size()>0){ BatchTask task = new BatchTask(batchDAO,saveOrUpdateList); if (!executorService.isShutdown()) { completionService.submit(task); } } } //保证最后一笔记录执行,避免无限循环 if(kipEnd > kipMax){ kipStart = kipEnd + 1; kipEnd = kipStart + exeStep; logger.warn("the task finished,after 2 sec over!"); break; } kipStart = kipEnd + 1; kipEnd = kipStart + exeStep; if(kipEnd>=kipMax){ kipEnd = kipMax; } if (!executorService.isShutdown()){ executorService.shutdown(); } boolean sleepKip = executorService.isTerminated(); long times = 0L; while(!sleepKip){ Thread.sleep(timeStep); sleepKip = executorService.isTerminated(); times += timeStep; if(times >= stopStep){ logger.error("单次服务所有task累计执行时间超过1个小时,请排查!" + times); executorService.shutdownNow(); break; } } //times timeout exit ... if(times >= stopStep){ logger.warn("times timeout exit ..."); return false; } long endTime = System.currentTimeMillis(); totalTimes += (endTime-startTime); //递归超时 long hours = (totalTimes % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60); if(hours >= totalStep){ logger.warn("totalTimes timeout exit: "+totalTimes+" milliseconds!"); //return false; } if(kipEnd < kipMax){ return exeToMysqlTask(kipStart,kipEnd,kipMax); } }catch(Exception e){ logger.error("foot bar tair to mysql failed:"+e); } return true; } } class BatchTask implements Callable<Object> { private List<FootBarInfo> list; private BatchDAO barBatchDAO; public BatchTask(){ } public BatchTask(BatchDAO barBatchDAO,List<FootInfo> list){ this.list = list; this.barBatchDAO = barBatchDAO; } public synchronized String call() throws Exception { if(list != null){ barBatchDAO.batchSaveOrUpdate(list); } return ""; } }
千万级插入更新,分页技巧
import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Resource; import org.springframework.stereotype.Component; @Component("batchDAO") public class BatchDAO { @Resource private DataSource dataSource; static StringBuffer saveOrUpdateStr = new StringBuffer(); static StringBuffer nextPageStr = new StringBuffer(); static int commitInt = 2000; static{ saveOrUpdateStr.append("insert into foot_info (userid,siteinfo,create_time) values "); saveOrUpdateStr.append("(?,?,now())"); //组装更新语句 for(int i=1;i<commitInt;i++){ saveOrUpdateStr.append(",(?,?,now())"); } saveOrUpdateStr.append(" on duplicate key update siteinfo=values(siteinfo),create_time=values(create_time);"); nextPageStr.append(" select userid from foot_info "); nextPageStr.append(" where userid >= ? "); nextPageStr.append(" order by userid asc limit ? "); } /** * 将记录更新到daogou_footbar_info表,不存在插入,存在更新siteinfo * 单笔提交5000条 * @param footBarList * @throws SQLException */ public void batchSaveOrUpdate(List<FootInfo> footList) throws SQLException{ Connection conn = null; PreparedStatement ps = null; try { conn = dataSource.getConnection(); conn.setAutoCommit(false); ps = conn.prepareStatement(saveOrUpdateStr.toString()); List<List<FootInfo>> splitList = splitList(footList,commitInt); for(int i=0;i<splitList.size();i++){ List<FootInfo> lists = splitList.get(i); //不够整次处理 if(lists.size()<commitInt){ StringBuffer updateTmp = new StringBuffer(); updateTmp.append(" insert into foot_info (userid,siteinfo,create_time) values "); updateTmp.append("(?,?,now())"); //组装更新语句 for(int n=1;n<lists.size();n++){ updateTmp.append(",(?,?,now())"); } updateTmp.append(" on duplicate key update siteinfo=values(siteinfo),create_time=values(create_time);"); ps = conn.prepareStatement(updateTmp.toString()); } for(int j=0;j<lists.size();j++){ FootInfo foot = (FootInfo)lists.get(j); ps.setLong(2*j+1, foot.getUserId()); ps.setString(2*j+2, foot.getSiteInfo()); } ps.execute(); conn.commit(); } } catch (SQLException e) { throw new SQLException(e); } finally{ if(ps != null){ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn != null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } /** * 对1000W表设计的分页 * 对userid排序,从最小记录开始查询每次limit 1W条 * 每次查询userid>上次查询的最大userid,这样解决了数据过大的分页问题,这里只做下一页的分页 * @param start * @param step * @return * @throws SQLException */ public List<Long> getNextPage(Long start,Long step) throws SQLException{ Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = dataSource.getConnection(); ps = conn.prepareStatement(nextPageStr.toString()); ps.setLong(1, start); ps.setLong(2, step); rs = ps.executeQuery(); List<Long> userIds = new ArrayList<Long>(); while(rs.next()){ userIds.add(rs.getLong("userid")); } return userIds; } catch (SQLException e) { throw new SQLException(e); } finally{ if(rs != null){ try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if(ps != null){ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn != null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static <T> List<List<T>> splitList(List<T> list, int pageSize) { List<List<T>> listArray = new ArrayList<List<T>>(); if(list != null && list.size()>0){ int listSize = list.size(); int page = (listSize + (pageSize-1))/ pageSize; for(int i=0;i<page;i++) { List<T> subList = new ArrayList<T>(); for(int j=0;j<listSize;j++) { int pageIndex = ( (j + 1) + (pageSize-1) ) / pageSize; if(pageIndex == (i + 1)) { subList.add(list.get(j)); } if( (j + 1) == ((j + 1) * pageSize) ) { break; } } listArray.add(subList); } } return listArray; } }
MysqlSequence
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Resource; import org.springframework.stereotype.Service; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @Service("sequenceImpl") public class MysqlSequence implements Sequence { private int rangeStep = 1000; private volatile AtomicLong sequence = new AtomicLong(-1); private final Lock lock = new ReentrantLock(); private String sequenceName=""; private volatile long seqEndPoint ; @Resource private SequenceDao sequenceDao ; private int defaultTryTime = 100; @Override public long nextValue(String sequenceName) throws Exception { int tryTime = defaultTryTime; for (;;) { if (tryTime < 0) { throw new IllegalStateException("try too many times,maybe the database crashed!"); } tryTime--; if (sequence.get() == -1L) { lock.lock(); try { if(sequence.get() == -1L) { Long expectValue = sequenceDao.getSequnceNumber(sequenceName); boolean ret = sequenceDao.getAndIncrement(sequenceName, rangeStep,expectValue); if(!ret) continue; sequence.set(expectValue); seqEndPoint = sequence.get() + rangeStep; } } finally { lock.unlock(); } } lock.lock(); try { if(sequence.get() >= seqEndPoint) { sequence.set(-1L); continue; } return sequence.incrementAndGet(); } finally { lock.unlock(); } } } public void setRangeStep(int rangeStep) { Preconditions.checkArgument(rangeStep>=100,"rangeStep must large than 100!"); this.rangeStep = rangeStep; } public String getSequnceName() { return sequenceName; } public void setSequnceName(String sequenceName) { Preconditions.checkArgument(!Strings.isNullOrEmpty(sequenceName),"sequnceName is not allow null!"); this.sequenceName = sequenceName; } public void setDefaultTryTime(int defaultTryTime) { this.defaultTryTime = defaultTryTime; } }