有时候并发操作太猛了,会对共享资源(DB、或者网络资源)造成伤害,简单记录一下控制并发速率的几种策略
1.使用信号量, java.util.concurrent.Semaphore
public void migrate(List<Long> list) throws InterruptedException { long lastMigId = 0; final Semaphore semaphore = new Semaphore(5); if(CollectionUtils.isNotEmpty(list)) { semaphore.acquire(); WORKER.execute(new Runnable() { @Override public void run() { try { //batchUpdate(list); } finally { semaphore.release(); } } }); } }
2.类似Semaphore,使用com.google.common.util.concurrent.RateLimiter,控制一秒操作多少个,类似QPS的概念。
比如mysql没法执行批量更新的时候,要循环更新,这时候最好做一些并发控制,如
if (CollectionUtils.isNotEmpty(needToInsertList)) { final RateLimiter limiter = RateLimiter.create(Constants.RateLimiter_Size); for(ScheduledSellingInfo info : needToInsertList) { limiter.acquire(); scheduledSellingInfoRepository.replace(info); } }
再比如多线程环境下:
public void incrementMigrateWithSalesNo1(Collection<Long> salesNoList) { if (CollectionUtils.isNotEmpty(salesNoList)) { final RateLimiter limiter = RateLimiter.create(2000); for (final Long salesNo : salesNoList) { limiter.acquire(); Runnable task = new Runnable() { @Override public void run() { try { List<SellBrandRelation> list = vipGoodsDao.getSellBrandRelationBySalesNo(salesNo); if(CollectionUtils.isNotEmpty(list)) { batchUpdate(list); } } finally { //to dosomeing } } }; WORKER.execute(task); } } } public final static ExecutorService WORKER = Executors.newFixedThreadPool(20, new NamedThreadFactory("migrate"));
3.RateLimiter和阑珊结合java.util.concurrent.CountDownLatch
如:
public void incrementMigrateWithSalesNo(Collection<Long> salesNoList) { if (CollectionUtils.isNotEmpty(salesNoList)) { final RateLimiter limiter = RateLimiter.create(2000); final CountDownLatch latch = new CountDownLatch(salesNoList.size()); for (final Long salesNo : salesNoList) { limiter.acquire(); Runnable task = new Runnable() { @Override public void run() { try { List<SellBrandRelation> list = vipGoodsDao.getSellBrandRelationBySalesNo(salesNo); if(CollectionUtils.isNotEmpty(list)) { batchUpdate(list); } } finally { latch.countDown(); } } }; WORKER.execute(task); } latch.await(); } }