背景
数字证书申请需要私钥和CSR数据,但公私钥对的生成,尤其是强度较高如2048位秘钥,平均单个耗时接近300ms,且生成时间随着随机数的变化而出现较大的波动;
KeyPairGenerator generator = KeyPairGenerator.getInstance("RSA");
generator.initialize(2048, new SecureRandom());
long startTime = System.currentTimeMillis();
for(int i = 0; i < 100; i++) {
generator.generateKeyPair();
}
System.out.println("cost: " + (System.currentTimeMillis() - startTime));
复制代码
为保证系统的快速响应,秘钥对必须预先生成并存储在秘钥池中,在申请数字证书时直接从秘钥池中读取即可;
在系统早期,由于业务规模不大且业务量基本稳定,为了节省部署成本,秘钥对生成与证书申请在同一个服务中运行,采用简单的生产者消费者模型进行实现:
- 使用线程池进行秘钥对的持续生成,线程池核心线程数=最大线程数=(Runtime.getRuntime().availableProcessors() - 1);
- 使用LinkedBlockingQueue保存生成的秘钥对,并设置队列上限2000个;
问题
有一日,某互联网金融客户需要进行线上活动,由于客户已知用户创建和申请数字证书过程较为耗时,故客户选择提前批量创建用户,再进行线上活动。结果可想而知:
- 证书申请服务本地秘钥池被消耗殆尽,缓存击穿;
- 业务触发同步生成秘钥对,CPU资源竞争加剧,秘钥池生成速度更加缓慢,用户创建大量超时;
- 服务未统一网络出口,CA中心存在IP白名单限制,无法快速完成扩容;
- 即使完成扩容,因为秘钥池为空,业务依然同步生成秘钥,效果一般;
最后只能使出杀手锏:限制客户的流量,并与客户沟通说系统进行了限流……
优化
原业务模型存在以下典型问题:
- 秘钥对生成属于后台运算密集型业务,与用户业务无关,但与业务存在CPU资源竞争,可导致证书申请业务性能不稳定;
- 秘钥对生成能力与业务能力耦合,无法实现单项能力的快速扩容;
- 各Queue数据无法快速进行统一统计,无法快速简单的监控秘钥池中的缓存对象数量;
- 服务重启后已经生成的秘钥对消失,导致系统不允许快速进行服务重启;
针对上述问题,进行以下思路的优化:
- 引入redis作为集中式缓存,通过缓存池解耦用户业务和后台工具,并解决秘钥池重启丢失的问题;
- 使用redis缓存的List结构存储秘钥对,秘钥对最大上限为10W;
- 对redis中List的元素个数进行监控,数量低于总数的35%时进行告警,并以此可实现服务集群动态自动扩容(暂未折腾);
- 使用reetrantLock和Condition控制多个队列的特性进行生产者的生产控制;
- 秘钥生成服务采用线程池生成秘钥对,当redis池中总数满时,线程进入await状态,等待被唤醒;
- 秘钥生成服务基于Spring Schedule采用固定间隔5s的速度轮询池中对象数量,低于75%固定唤醒一半线程池线程生成秘钥;低于50%时唤醒所有线程池线程生成秘钥。
简化实现
考虑到线程数远远小于缓存对象数,所以控制10W只是一个近似值,多几个少几个在当前业务场景下对redis本身不会造成太大的影响
秘钥对生成worker
/**
* 秘钥对生成执行线程池
*/
public final class KeyPairExecutors {
// Redis Client 桩类
private final RedisCacheDAO cacheDAO;
// 锁
private final ReentrantLock lock;
// 与线程数量相当的condition
private final List<Condition> workerConditions;
public KeyPairExecutors(ReentrantLock lock, List<Condition> workerConditions) {
this.cacheDAO = new RedisCacheDAO();
this.workerConditions = workerConditions;
this.lock = lock;
}
/**
* 工作线程构造方法
*/
public void start() {
int coreNum = workerConditions.size();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(coreNum, coreNum, 120, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new KeyPairGeneratorThreadFactory("keypair_gen_", workerConditions.size()),
new ThreadPoolExecutor.DiscardOldestPolicy());
Stream.iterate(0, n -> n + 1).limit(workerConditions.size())
.forEach( i -> poolExecutor.submit(new KeyPairRunable(cacheDAO, lock, workerConditions.get(i))));
}
class KeyPairRunable implements Runnable {
private final RedisCacheDAO cacheDAO;
private final ReentrantLock lock;
private final Condition condition;
public KeyPairRunable(RedisCacheDAO cacheDAO, ReentrantLock lock, Condition condition) {
this.cacheDAO = cacheDAO;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
while(true) {
String keyBytes = genKeyPair();
try {
int currentSize = cacheDAO.listLpush(keyBytes);
// 写入记录后实时返回当前List元素数
if(currentSize >= RedisCacheDAO.MAX_CACHE_SIZE) {
System.out.println("cache is full. " + Thread.currentThread().getName() + " ready to park.");
lock.lock();
condition.await();
System.out.println("cache is consuming. " + Thread.currentThread().getName() + " unparked.");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interuupted.");
} finally {
if(lock.isLocked()) {
lock.unlock();
}
}
}
}
private String genKeyPair() {
// TODO 秘钥对桩
return "";
}
}
class KeyPairGeneratorThreadFactory implements ThreadFactory {
private final String threadGroupName;
private final AtomicInteger idSeq;
public KeyPairGeneratorThreadFactory(String threadGroupName, int maxSeq) {
this.threadGroupName = threadGroupName;
this.idSeq = new AtomicInteger(maxSeq);
}
@Override
public Thread newThread(Runnable r) {
int threadId = idSeq.getAndDecrement();
if(threadId < 0) {
throw new UnsupportedOperationException("thread number cannot be out of range");
}
return new Thread(r, threadGroupName + "_" + threadId);
}
}
}
复制代码
秘钥对生成monitor
/**
* 秘钥对生成定时调度
*/
public enum KeyPairsMonitor {
INSTANCE;
private final ReentrantLock reentrantLock;
private final List<Condition> conditionList;
private final RedisCacheDAO redisCacheDAO;
private final int coreSize;
KeyPairsMonitor() {
this.redisCacheDAO = new RedisCacheDAO();
this.reentrantLock = new ReentrantLock();
coreSize = Runtime.getRuntime().availableProcessors();
this.conditionList = new ArrayList<>(coreSize);
for( int i=0; i< coreSize; i++ ) {
conditionList.add(reentrantLock.newCondition());
}
}
/**
* 启动密钥生成任务,开启调度
*/
public void monitor() {
KeyPairExecutors executors = new KeyPairExecutors(reentrantLock, conditionList);
executors.start();
buildMonitorSchedule();
}
/**
* 构造定时任务
*/
private void buildMonitorSchedule() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
int currentSize = redisCacheDAO.listLlen();
System.out.println("current cache size is: " + currentSize);
int executNum = 0;
if(currentSize <= RedisCacheDAO.HALF_MAX_CACHE_SIZE) {
System.out.println("current cache level is under 50% to ." + currentSize);
executNum = coreSize;
} else if(currentSize <= RedisCacheDAO.PERCENT_75_MAX_CACHE_SIZE) {
System.out.println("current cache level is under 75% to ." + currentSize);
executNum = coreSize >> 1;
}
for(int i=0; i < executNum; i++) {
try {
reentrantLock.lock();
conditionList.get(i).signal();
} catch (IllegalMonitorStateException e) {
// do nothing, condition no await
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
if(reentrantLock.isLocked()) {
reentrantLock.unlock();
}
}
}
}
}, 0, 5, TimeUnit.SECONDS);
}
复制代码
打桩redis操作List操作:
public class RedisCacheDAO {
public static final String DEFAULT_KEYPAIE_CACHE_KEY = "keypaie_redis_list_rsa_byte";
public static final int MAX_CACHE_SIZE = 1 << 4;
public static final int HALF_MAX_CACHE_SIZE = MAX_CACHE_SIZE >> 1;
public static final int PERCENT_75_MAX_CACHE_SIZE = MAX_CACHE_SIZE - (MAX_CACHE_SIZE >> 2);
private String key;
private static final AtomicInteger count = new AtomicInteger(1);
public RedisCacheDAO() {
this.key = DEFAULT_KEYPAIE_CACHE_KEY;
}
public RedisCacheDAO(String key) {
this.key = key;
}
public int listLpush(String value) {
System.out.println(Thread.currentThread().getName() + " push value");
return count.addAndGet(1);
}
public int listLlen() {
return count.get();
}
public void listPop(int newValue) {
count.getAndSet(newValue);
}
}
复制代码
Main方法:
public static void main(String[] args) throws InterruptedException {
KeyPairsMonitor monitor = KeyPairsMonitor.INSTANCE;
monitor.monitor();
while (true) {
RedisCacheDAO dao = new RedisCacheDAO();
Thread.sleep(10);
dao.listPop(new Random().nextInt(RedisCacheDAO.MAX_CACHE_SIZE));
}
}
复制代码
思考
按照上述方案修改后,至少再面对流量峰值时,可以快速购买高核配置ECS,快速扩容。 但是,由于调度者是分散在各个服务器中独立运行的,如果调度线程异常退出,那么该服务器的工作线程将永远不能工作。
目前考虑:
思路1: 采用分布式任务调度,采用统一维护的调度中心进行工作者的调度,但系统复杂度将远大于当前实现;
思路2: 使用condition的超时机制实现自治,即设置condition的超时时间,超时过后主动生成写入,配合缓存的allkeys-random淘汰策略,即可缓解上述问题带来的风险。