整体项目介绍 见 https://blog.csdn.net/wenjieyatou/article/details/80190886
分支一学习笔记:
分支1.1
1:解决优惠券编码重复问题,原先采用的是获取数据库所有的券,然后去比对是否重复,如果库数据量达百万的时候就会出现非常缓慢,而且会出现经常制券失败等,所以此版本舍弃原先采用随机数的模式,通过推特的雪花算法来保证唯一,但是依然保留优惠券前缀和后缀。2:由原来的异步采用线程修改为线程池,以为高并发时候存在大量的线程占内存空间。
3:由原来制券采用for循环模式修改为批量制券,而且采用分配插入优惠券,一批次目前定为5000.
4:加入消息队列(采用rabbitMQ)对于某一批次添加失败,把失败的放入对列中,通过队列进行补救,已到达高可用。避免大批量优惠券来回重新导入消息队列对于异常信息拒绝解决并重返消息队列中,配置2个消费者以避免其中一个服务异常,消息处理出现死循环。
1,第一个问题,由于数据请求在百万级别数据库,所以可能造成查询十分缓慢甚至出现超时请求问题。所以直接生成唯一的优惠券值即可。具体代码如下:
import java.lang.reflect.Executable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class GenerateOnlyIdUtils { private final long twepoch = 1288834974657L; private final long workerIdBits = 5L; private final long datacenterIdBits = 5L; private final long maxWorkerId = -1L ^ (-1L << workerIdBits); private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); private final long sequenceBits = 12L; private final long workerIdShift = sequenceBits; private final long datacenterIdShift = sequenceBits + workerIdBits; private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; private final long sequenceMask = -1L ^ (-1L << sequenceBits); private long workerId; private long datacenterId; private long sequence = 0L; private long lastTimestamp = -1L; public GenerateOnlyIdUtils(long workerId, long datacenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId; } public synchronized long nextId() { long timestamp = timeGen(); if (timestamp < lastTimestamp) { throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); } //如果是同一时间序列 则时间序列内区别 if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; if (sequence == 0) { timestamp = tilNextMillis(lastTimestamp); } } else { sequence = 0L; } lastTimestamp = timestamp; return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence; } protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } protected long timeGen() { return System.currentTimeMillis(); } public static void main(String args[]) { final GenerateOnlyIdUtils idGe = new GenerateOnlyIdUtils(1, 1); //线程池并行执行100次全局ID生成 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { executorService.execute(new Runnable() { @Override public void run() { long id = idGe.nextId(); System.out.println(id); } }); } executorService.shutdown(); } }
2:由原来的异步采用线程修改为线程池,以为高并发时候存在大量的线程占内存空间。
线程池的自定义可以参考以下代码为:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.springframework.scheduling.concurrent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureTask; public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); //个人觉得这个地方的线程数量设置不是很对 大家有意见可以评论。感觉就只有一个活动线程(corePoolSize),就失去了线程池的意义。 private int corePoolSize = 1; private int maxPoolSize = 2147483647; private int keepAliveSeconds = 60; private int queueCapacity = 2147483647; private boolean allowCoreThreadTimeOut = false; private ThreadPoolExecutor threadPoolExecutor; public ThreadPoolTaskExecutor() { } public void setCorePoolSize(int corePoolSize) { Object var2 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); } } } public int getCorePoolSize() { Object var1 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { return this.corePoolSize; } } public void setMaxPoolSize(int maxPoolSize) { Object var2 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { this.maxPoolSize = maxPoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); } } } public int getMaxPoolSize() { Object var1 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { return this.maxPoolSize; } } public void setKeepAliveSeconds(int keepAliveSeconds) { Object var2 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { this.keepAliveSeconds = keepAliveSeconds; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setKeepAliveTime((long)keepAliveSeconds, TimeUnit.SECONDS); } } } public int getKeepAliveSeconds() { Object var1 = this.poolSizeMonitor; synchronized(this.poolSizeMonitor) { return this.keepAliveSeconds; } } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; } protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; } protected BlockingQueue<Runnable> createQueue(int queueCapacity) { return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue()); } public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); return this.threadPoolExecutor; } public int getPoolSize() { return this.threadPoolExecutor == null ? this.corePoolSize : this.threadPoolExecutor.getPoolSize(); } public int getActiveCount() { return this.threadPoolExecutor == null ? 0 : this.threadPoolExecutor.getActiveCount(); } public void execute(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { executor.execute(task); } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public void execute(Runnable task, long startTimeout) { this.execute(task); } public Future<?> submit(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { return executor.submit(task); } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public <T> Future<T> submit(Callable<T> task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { return executor.submit(task); } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public ListenableFuture<?> submitListenable(Runnable task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { ListenableFutureTask<Object> future = new ListenableFutureTask(task, (Object)null); executor.execute(future); return future; } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public <T> ListenableFuture<T> submitListenable(Callable<T> task) { ThreadPoolExecutor executor = this.getThreadPoolExecutor(); try { ListenableFutureTask<T> future = new ListenableFutureTask(task); executor.execute(future); return future; } catch (RejectedExecutionException var4) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4); } } public boolean prefersShortLivedTasks() { return true; } }
在制券过程中会涉及线程池的使用,具体代码参考:
@Override public int makingCoupon(Long vendorId, String taskCode) { try { final CpMakingTask tempMakingTask = makingTaskManager.getMakingTask(vendorId, taskCode); if (tempMakingTask == null) { log.error("没有保存制券任务"); return 2; } if (makingTaskManager.isRepeatByMaikingCoupon(vendorId, taskCode)) { log.error("制券正在进行,不要重复提交"); return 3; } tempMakingTask.setState(SysConstants.CPMAKINGTASTSTATE.UNDERWAY); tempMakingTask.setTaskBeginTime(new Date()); makingTaskDao.update(tempMakingTask); long start1 = System.currentTimeMillis(); final List<Coupon> tempCoupons = getTempCoupons(tempMakingTask); long start2 = System.currentTimeMillis(); log.info("生产优惠券消费时间:" + (start2 - start1) + "毫秒"); if (CollectionUtils.isNotEmpty(tempCoupons)) { makingTaskManager.insertCacheByMakingConpon(vendorId, taskCode); taskExecutor.execute(new Runnable() { @Override public void run() { boolean result = couponManager.insertCoupons(tempCoupons); if (result) { tempMakingTask.setState(SysConstants.CPMAKINGTASTSTATE.COMPLETED); tempMakingTask.setTaskEndTime(new Date()); makingTaskDao.update(tempMakingTask); } } }); } } catch (Exception e) { log.error("异步制券出现异常", e); return 0; } return 1; }
3:由原来制券采用for循环模式修改为批量制券,而且采用分配插入优惠券,一批次目前定为5000.具体代码参考:
这是起先采用for循环制券:
/** * 采用for循环插入(5000条数据大约耗时3000毫秒,无疑比for循环效率更佳) */ @Test public void insertCpActivityForBatchUnitTests(){ long start=System.currentTimeMillis(); CpActivity cpActivity = new CpActivity(); cpActivity.setVendorId(1432L); List<CpApplyLimitdt> applyLimitdts=new ArrayList<>(); List<CpUseLimitdt> useLimitdts=new ArrayList<>(); for (int i=0;i<5000;i++){ CpApplyLimitdt applyLimitdt=new CpApplyLimitdt(); applyLimitdt.setId(1l); applyLimitdt.setVendorId(1433l); applyLimitdt.setCreateDate(new Date()); applyLimitdt.setDetailName("ASCDFDRFCF"); applyLimitdt.setOwnRecordCode(""+i); applyLimitdt.setDetailCode(""+i); applyLimitdt.setModifyDate(new Date()); applyLimitdt.setMemo("备注"); applyLimitdt.setApplyScopeType(1); applyLimitdts.add(applyLimitdt); } for (int i=0;i<5000;i++){ CpUseLimitdt useLimitdt=new CpUseLimitdt(); useLimitdt.setStoreName("加油站"); useLimitdt.setCreator("lp"); useLimitdt.setModifyDate(new Date()); useLimitdt.setOwnRecordCode("12345678"); useLimitdt.setOrganName(""+i); useLimitdt.setMemo("备注"); useLimitdt.setOwnRecordType(1); useLimitdt.setVendorId(1433l); useLimitdt.setCreateDate(new Date()); useLimitdt.setStoreCode(""+i); useLimitdts.add(useLimitdt); } boolean result = activityManager.insertCpActivity(cpActivity, applyLimitdts, useLimitdts); long end=System.currentTimeMillis(); System.out.println("消耗的总时间:"+(end-start)+"毫秒"); }
//以5000记录为标准 插入数据 数据库操作是批量插入 @Override public boolean insertCoupons(final List<Coupon> coupons) { final long start1 = System.currentTimeMillis(); TransactionTemplate template = new TransactionTemplate(transactionManager); return template.execute(new TransactionCallback<Boolean>() { @Override public Boolean doInTransaction(TransactionStatus transactionStatus) { try { if (coupons.size() <= 5000) { couponDao.insertBatchCoupons(coupons); } if (coupons.size() > 5000) { List<List<Coupon>> tempCoupons = ListUtil.splitList(coupons, 5000); for (List<Coupon> item : tempCoupons) { try { couponDao.insertBatchCoupons(item); } catch (Exception e) { String data = JsonUtil.g.toJson(item); mqSenderHandler.sendMessage("spring.makeCoupons.queueKey", data); continue; } } } long end1 = System.currentTimeMillis(); log.info("添加" + coupons.size() + "张优惠券消耗时间:" + (end1 - start1) + "毫秒"); return true; } catch (Exception e) { log.error("添加优惠券异常:" + e); transactionStatus.setRollbackOnly(); return false; } } }); }
关于对批量数据插入的实现还可以参考一下解释,可以为以后的工作提供一定的思路。
大批量数据高效插入数据库表
题目:将一批10万数据通过系统(Java语言)导入数据库表,并要求实时看到导入的进度,请设计出系统方案,重点要考虑导入效率!
这个题场景很简单!
题中提到了两个重要需求:
1、实时看到导入的进度;
2、导入效率。
第一个需求可以简单理解就是能够看到导入过程。进度就是一个大概情况,并不是一个要求非常精准的数,但是如果做的很牛逼就是真实的反应成功导入的条数。这样话实现的方案有大有不同了!第二个需求就是一个无限想象的需求!只要能提高导入效率的都可以写入方案,看来是要看答题人的思考问题的广度和技术经验了。
下面是我的方案也并非完美!
1、 首先考虑10万数据,是一个不小的数据量,题目没有说数据存储的介质,我就不考虑如何读文件了。10万数据如果按照一条一条导入肯定不是一个最优的方案,数据之间没有相关联,那就将数据分隔为1万一批,将10万数据分给10线程处理,这样肯定要比一个线程导的快,这样算是一个分布计算啦!呵呵!
2、 现在我们要考虑一个线程怎么导入快。怎么提高效率,一般有数据库操作,就要重点考虑数据库操作,此题是一个插入数据库表。我们立刻想到:
for(Object o : list ){
insert into table value ( o );
}
这种显然是拼接SQL一个条条插入数据表嘛!相信一般人都会这么想,这样对吗?肯定没有问题,但是绝对不高效的,每次执行一次就要像数据库传SQL,数据库编译SQL,然后执行。我们是大批数据插入,应该使用批处理方式。
String sql = “insert into table(…) values (?, ?, ?)”;
Connection connection = new getConnection();
PreparedStatement ps = connection.prepareStatement(sql);
for (Object o: list) {
ps.setString(1,o.get…);
ps.setString(2, o.get…);
ps.setString(3, o.get…);
ps.addBatch();
}
ps.executeBatch();
ps.close();
connection.close();
这样实现速度就比第一种要快很多,SQL语句也只需编译一次。
3、 按照上面做还有问题吗?显然还不够嘛!我们还没有考虑事务!一万条提交一次事务显然不合理,对数据库压力很大,所以要控制事务的粗粒度,可以100条提交一次事务。
4、 接下来就要考虑数据库了,我经常问大家?表建索引和没有索引有啥区别?很多人都说建索引查询速度快,其实还有一点就是插入慢了啊!所以插入的表单表不要建索引,有索引将索引删掉,导入完后再建索引。
5、 高效插入应该差不多了,现在要考虑第二个需求—实时进度。很多人第一反应就是select count(1) from table,这样存在一个问题就是查询的频率,频率大数据就比较精确,如果频率小数据就不准确,5分钟查一次比1分钟获得数据要差很大,这个方案显然不算很合理了。那就在内存设置一个变量n,实时保存插入成功数量,但是根据1点说多线程同时插入,那就有可能两个线程同时加,造成数据也不准确了,那么我们就要考虑对n加锁啦!
到此我们的方案就算完了!是不是最优的?肯定还有改进的空间,但是这样够了。4:加入消息队列(采用rabbitMQ)对于某一批次添加失败,把失败的放入对列中,通过队列进行补救,已到达高可用。避免大批量优惠券来回重新导入消息队列对于异常信息拒绝解决并重返消息队列中,配置2个消费者以避免其中一个服务异常,消息处理出现死循环。
这个地方我没看懂 因为还不了解消息队列机制。不过可以参考下制券的代码:
消费者1:
package com.peiyu.mem.rabbitmq.consumers; import com.google.gson.reflect.TypeToken; import com.migr.common.util.JsonUtil; import com.migr.common.util.StringUtils; import com.peiyu.mem.dao.CouponDao; import com.peiyu.mem.domian.entity.Coupon; import com.peiyu.mem.rabbitmq.Gson2JsonMessageConverter; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * Created by Administrator on 2016/12/8. */ @Component public class MakeCouponsHandler1 implements ChannelAwareMessageListener { private Logger log = Logger.getLogger(MakeCouponsHandler1.class); @Autowired private CouponDao couponDao; @Autowired private Gson2JsonMessageConverter jsonMessageConverter; @Override public void onMessage(Message message, Channel channel) throws Exception { try { channel.basicQos(1); if (message == null || message.getBody() == null) { return; } String data = jsonMessageConverter.fromMessage(message).toString(); if (StringUtils.isNotBlank(data)) { List<Coupon> coupons = JsonUtil.g.fromJson(data, new TypeToken<List<Coupon>>() { }.getType()); couponDao.insertBatchCoupons(coupons); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("1类:"+message.getMessageProperties().getDeliveryTag()); } } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); log.error("消息队列处理制券异常:" + e); } } }
消费者2:
package com.peiyu.mem.rabbitmq.consumers; import com.google.gson.reflect.TypeToken; import com.migr.common.util.JsonUtil; import com.migr.common.util.StringUtils; import com.peiyu.mem.dao.CouponDao; import com.peiyu.mem.domian.entity.Coupon; import com.peiyu.mem.rabbitmq.Gson2JsonMessageConverter; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * Created by Administrator on 2016/12/12. */ @Component public class MakeCouponsHandler2 implements ChannelAwareMessageListener { private Logger log = Logger.getLogger(MakeCouponsHandler2.class); @Autowired private CouponDao couponDao; @Autowired private Gson2JsonMessageConverter jsonMessageConverter; @Override public void onMessage(Message message, Channel channel) throws Exception { try { channel.basicQos(1); if (message == null || message.getBody() == null) { return; } String data = jsonMessageConverter.fromMessage(message).toString(); if (StringUtils.isNotBlank(data)) { List<Coupon> coupons = JsonUtil.g.fromJson(data, new TypeToken<List<Coupon>>() { }.getType()); couponDao.insertBatchCoupons(coupons); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); log.error("消息队列处理制券异常:" + e); } } }这个分支大体就是这些内容。