面试系列“讲一下吧”之 OkHttp 源码解析

阅读指南:
本博客,前面源码部分,duck不必细看,大概瞄一眼就行,或只看我加注释的部分就行。我会在文末描述下大概流程把流程理清楚了再看代码比较好,看代码注意只看主要流程,其他分支末节,有时间自己慢慢研究就行。

使用

没什么好说的,记住这么用就好。

val okHttpClient = OkHttpClient()
val request = Request.Builder()
    .url("https://xxxx")
    .build()
val call = okHttpClient.newCall(request)
call.enqueue(object :okhttp3.Callback{
    override fun onFailure(call: okhttp3.Call, e: IOException) {
        
    }

    override fun onResponse(call: okhttp3.Call, response: okhttp3.Response){
        
    }
})

OkHttpClient

OkHttpClient 构造方法的各种参数,了解一下就好,也可以直接跳过。

//建造者模式
public OkHttpClient() {
  this(new Builder());
}

//就是一些初始化的参数大概看一看就好
public static final class Builder {
  //用于调度线程的
  Dispatcher dispatcher;
  //网络代理
  @Nullable Proxy proxy;
  //支持的协议的版本
  List<Protocol> protocols;
  //支持http、https的配置,https的各种加密协议
  List<ConnectionSpec> connectionSpecs;
  //拦截器
  this.interceptors.addAll(okHttpClient.interceptors);
  this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
  //做统计相关的
  EventListener.Factory eventListenerFactor
  ProxySelector proxySelector;
  //饼干罐子,存储cookie用的
  CookieJar cookieJar;
  //httpCache
  @Nullable Cache cache;
  @Nullable InternalCache internalCache;
  //获取连接端口,socket是TCP的东西
  SocketFactory socketFactory;
  //SSL连接的Factory
  @Nullable SSLSocketFactory sslSocketFactory;
  //服务器证书
  @Nullable CertificateChainCleaner certificateChainCleaner;
  //主机名验证器,for https。
  HostnameVerifier hostnameVerifier;
  //自签名证书的验证,利用公钥来验证的。
  CertificatePinner certificatePinner;
  //授权用的(代理用的)
  Authenticator proxyAuthenticator;
  //正常用的授权
  Authenticator authenticator;
  //连接池,待缓存的集合
  ConnectionPool connectionPool;
  Dns dns;
  //重定向时候有一个https,可能是跳过去,可能是跳过来
  boolean followSslRedirects;
  //重定向跳转
  boolean followRedirects;
  //连接失败了是否要重新连接
  boolean retryOnConnectionFailure;
  int callTimeout;
  //TCP连接时间
  int connectTimeout;
  int readTimeout;
  int writeTimeout;
  //针对websocket的
  int pingInterval;
}

execute

call.execute() 

这是同步的执行方法。

@Override public Response execute() throws IOException {
//如果执行过,就报错
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  //transmitter 发射台的意思,先忽略下面两句
  transmitter.timeoutEnter();
  transmitter.callStart();
  try {
    //重点:dispatcher执行这个Call,对应下面的加入runningSyncCalls
    client.dispatcher().executed(this);
    //这行直接跳转下面标题
    return getResponseWithInterceptorChain();
  } finally {
    client.dispatcher().finished(this);
  }
}


/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
//加入正在执行的双端队列
  runningSyncCalls.add(call);
}

enqueue

//异步方法
@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.callStart();
  //看这行
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

//responseCallback封装一个AsyncCall,并加入 readyAsyncCalls
void enqueue(AsyncCall call) {
  synchronized (this) {
    readyAsyncCalls.add(call);
    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) {
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  promoteAndExecute();
}

接下来,我们从AsyncCall中找继续执行的方法,发现这个 AsyncCall 继承了一个 NamedRunnable implements Runnable,然后我们阅读 run() 方法。

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

//找到 run 方法了
  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
    //发现里面调用了execute,是一个抽象类的抽象方法,去子类 AsyncCall 中找 execute 方法
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}


//找到啦!
@Override protected void execute() {
  boolean signalledCallback = false;
  transmitter.timeoutEnter();
  try {
    //发现,还是走到了一个关键的方法,这个在下面有解析
    Response response = getResponseWithInterceptorChain();
    signalledCallback = true;
    responseCallback.onResponse(RealCall.this, response);
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } catch (Throwable t) {
    cancel();
    if (!signalledCallback) {
      IOException canceledException = new IOException("canceled due to " + t);
      canceledException.addSuppressed(t);
      responseCallback.onFailure(RealCall.this, canceledException);
    }
    throw t;
  } finally {
    client.dispatcher().finished(this);
  }
}

dispatcher

这个里面有三个双向队列。一个是为同步使用的,另外两个是为异步使用的。
这个类里面初始化了线程池,用线程池来处理线程,提高了程序的性能,为异步操作提供的。

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();


//线程池的初始化方法
public synchronized ExecutorService executorService() {
  if (executorService == null) {
  //可以看一下各个参数的意义
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

//promoteAndExecute 方法是线程池的 start 方法
/**
 * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
 * them on the executor service. Must not be called with synchronization because executing calls
 * can call into user code.
 *
 * @return true if the dispatcher is currently running calls.
 */
private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));
  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
  //对 readyAsyncCalls 进行遍历
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();
      //两个判断,一个事判断最大请求数>=64,一个是每个主机的请求数 >= 5
      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
      i.remove();
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      //添加到 runningAsyncCalls
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }
  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    //执行这个 AsyncCall
    asyncCall.executeOn(executorService());
  }
  return isRunning;
}

getResponseWithInterceptorChain(重点啊)

责任链模式

Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  //添加一大堆的拦截器
  interceptors.addAll(client.interceptors());
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
    interceptors.addAll(client.networkInterceptors());
  }
  interceptors.add(new CallServerInterceptor(forWebSocket));

//这个chain把所有的拦截器都装了进去
  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  boolean calledNoMoreExchanges = false;
  try {
  //然后开始调用 chain 的 Proceed 方法
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) {
      closeQuietly(response);
      throw new IOException("Canceled");
    }
    return response;
  } catch (IOException e) {
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
  } finally {
    if (!calledNoMoreExchanges) {
      transmitter.noMoreExchanges(null);
    }
  }
}

//chain只有一个实现类就是RealInterceptorChain,所以我们来到这个类的 proceed 方法

@Override public Response proceed(Request request) throws IOException {
  return proceed(request, transmitter, exchange);
}

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
    throws IOException {
//判断错误,不用看
  if (index >= interceptors.size()) throw new AssertionError();

  calls++;

  // If we already have a stream, confirm that the incoming request will use it.
  if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }

  // If we already have a stream, confirm that this is the only call to chain.proceed().
  if (this.exchange != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }

  // Call the next interceptor in the chain.
  // index 从0开始,在上面调用的代码中有描述
  RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
      index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
  Interceptor interceptor = interceptors.get(index);
  //开始调用各个拦截器的 intercept 方法,所有的拦截器都 implements 了 Interceptor,跳转到分析 RetryAndFollowUpInterceptor(默认这是第一个) 
  Response response = interceptor.intercept(next);

  // Confirm that the next interceptor made its required call to chain.proceed().
  if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }

  // Confirm that the intercepted response isn't null.
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }

  if (response.body() == null) {
    throw new IllegalStateException(
        "interceptor " + interceptor + " returned a response with no body");
  }

  return response;
}

RetryAndFollowUpInterceptor implements Interceptor

Interceptor 这个接口类中只有一个方法

Response intercept(Chain chain) throws IOException;

其次还有一个 Chain 类,其中有一个关键方法,叫 proceed

Response proceed(Request request) throws IOException;

我们从 RetryAndFollowUpInterceptor 开始看,我们先分析这个责任链模式。
我们可以把 RetryAndFollowUpInterceptor 这个类的 intercept 简化为下面代码

@Override public Response intercept(Chain chain) throws IOException {
    //做一些其他的准备工作
    response = realChain.proceed(request, transmitter, null);
    //处理response
    return response;
}

当调用的 proceed 方法的时候,我们就会调用到 realChain.proceed 方法,这样 index + 1,就会执行到下一个 Interceptor 的 intercept 方法,也就是 BridgeInterceptor 的 intercept 方法。我画一个图示意。

image

BridgeInterceptor implements Interceptor

CacheInterceptor implements Interceptor

ConnectInterceptor implements Interceptor

建立TCP连接。

OkHttp的主要用途是封装了网络请求,让我们更简单的操作网络请求。

  1. 使用线程池提高了性能。
  2. 然后就是拦截器部分,使用了责任链模式。熟悉下每个拦截器的作用。
发布了20 篇原创文章 · 获赞 25 · 访问量 5万+

猜你喜欢

转载自blog.csdn.net/u014772414/article/details/104921541