0. 线程池中, 怎么保证的一个会话数据的有序性
既然用到了线程池, 并不要求始终是同一个线程执行同一个会话的数据, 只要保证多个线程执行的同一个会话
的数据在时间上的有序性即可.
原理其实很简单, 每个会话的数据依次放到一个队列中. 特定时刻, 只有一个线程从此队列中读取数据执行.
借用netty中保持同一会话有序性的线程池的实现类的OrderedMemoryAwareThreadPoolExecutor说明, 线程之间操作
的流程示意如下:
Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) \ / X / \ Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3)
1. 代码层面
// 每个channel对应的Executor, 具体实现是ChildExecutor
private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
// ChildExecutor的添加任务的实现, 实际是添加到一个线性列表中
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>(); public void execute(Runnable command) { boolean needsExecution; synchronized (tasks) { needsExecution = tasks.isEmpty(); tasks.add(command); } if (needsExecution) { doUnorderedExecute(this); } }
// 具体的执行(实际是其他线程调用此Executor的run方法
public void run() { Thread thread = Thread.currentThread(); for (;;) { final Runnable task; synchronized (tasks) { task = tasks.getFirst(); } boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; onAfterExecute(task, null); } catch (RuntimeException e) { if (!ran) { onAfterExecute(task, e); } throw e; } finally { synchronized (tasks) { tasks.removeFirst(); if (tasks.isEmpty()) { break; } } } } }