本篇将介绍okhttp异步网络请求源码解析
(一) 首先看mOkHttpClient.newCall(build)--》enqueue方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
执行流程:如果当前还能执行一个并发请求,那就立即执行,否则加入 readyAsyncCalls 队列,而正在执行的请求执行完毕之后,会调用 promoteCalls() 函数,来把 readyAsyncCalls 队列中的 AsyncCall “提升”为 runningAsyncCalls,并开始执行。
(二)Dispatcher类(最重要的)
public final class Dispatcher {
private int maxRequests = 64; //最大并发请求数
private int maxRequestsPerHost = 5; //每个主机的最大请求数
private @Nullable Runnable idleCallback;
//消费者的线程池à默认缓存型线程池
private @Nullable ExecutorService executorService;
//将要运行的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在运行的异步请求队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 正在运行的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
//最采用缓存型线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory(“OkHttp Dispatcher”, false));
}
return executorService;
}
//设置最大请求数
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException(“max < 1: ” + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
//…………………………
总结:当请求队列的数量以及正在运行的请求主机数小于5时,就把请求加载到runningAsyncCalls中并在线程池中执行。
否则就加入到readyAsyncCalls中进行缓存等待。
Dispatcher类中的executed与finished的方法解析
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) { //添加任务的操作 添加在正在运行的异步请求队列
runningSyncCalls.add(call);
}
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) { //异步任务关闭
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) { //同步任务关闭
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { //执行关闭
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError(“Call wasn‘t in-flight!”); //移除任务的操作
if (promoteCalls)
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
promoteCalls()方法:从readyAsyncCalls取出下一个请求,加入runningAsyncCalls中并交给线程池处理
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.aexecutedd(call);
executorService(). (call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
.........