OkHttp3
官网的介绍——An HTTP & HTTP/2 client for Android and Java applications。
它的优点:
- 支持http2,对一台机器的所有请求共享同一个socket
- 支持连接池,支持连接复用,减少延迟
- 支持透明gzip压缩响应体
- 通过缓存避免重复的请求
- 请求失败时自动重试主机的其他ip,自动重定向
- API调用方便
OkHttp的使用
implementation 'com.squareup.okhttp3:okhttp:3.14.2'
复制代码
OkHttpClient okHttpClient = new OkHttpClient();
okHttpClient.newCall(
new Request.Builder()
.url("https://api.github.com/users/apkcore")
.build())
//enqueue是异步执行
.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: failure");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "onResponse: success");
}
});
复制代码
它的使用是非常简单的,我们在Android中一般使用enqueue执行异步请求。
现在我们看它的源码实现
OkHttp源码分析
初构请求
先看newCall的源码
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
复制代码
可以看到,直接调用了realCall的方法,说明RealCall是Call的实现,继续看RealCall.newRealCall的源码。
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
//利用传递过来的参数,new一个RealCall
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
复制代码
可以看到,先调用RealCall的构造方法,其中第三个参数是否为webSocket(可以服务端主动向客户端推送)。
然后new Transmitter这个对象,放入call中,它的构造源码如下
/**
* Bridge between OkHttp's application and network layers. This class exposes high-level application layer primitives: connections, requests, responses, and streams.
* ...
*/
public final class Transmitter {
public Transmitter(OkHttpClient client, Call call) {
this.client = client;
this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
this.call = call;
//主要是这句,连接的状态的时间点记录
this.eventListener = client.eventListenerFactory().create(call);
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
}
复制代码
可以看它的类注释可以知道,这是OkHttp的应用层和网络层之间的桥梁。中文又叫发射器,这个类执行连接,请求,响应和流的真正操作。
然后,我们继续看.enqueue方法的源码,直接点的话进入的是Call接口,刚刚我们已经发现了,RealCall才是Call的实现类,所以,在RealCall中找到.enqueue方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//每个请求只能执行一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
复制代码
可以看到,每个Call只执行一次,否则会抛异常,这里先transmitter.callStart方法,绑定call,开始通知监听器组件网络请求开始了, 主要是通知EventListener
。
然后到了关键代码client.dispatcher().enqueue(new AsyncCall(responseCallback));
,我们分开来阅读它的源码。
先看外层client.dispatcher()
final Dispatcher dispatcher;
public Dispatcher dispatcher() {
return dispatcher;
}
复制代码
这个Dispatcher类的源码如下:
/**
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
* of calls concurrently.
*/
public final class Dispatcher {
//控制最大请求并发数
private int maxRequests = 64;
//单个主机最大并发数
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
//线程池
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
//已经准备好异步执行的队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//正在执行的异步队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//正在执行的同步队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
...
}
复制代码
可以知道,Dispatcher
本质上是异步请求的调度器,负责调度异步请求的执行,控制最大请求并发数和单个主机的最大并发数,并持有一个线程池来负面异步请求,对同步只做统计操作。
再看enqueue
void enqueue(AsyncCall call) {
synchronized (this) {
//先把AsyncCall加到准备队列中
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
//更改AsyncCall使其能共享同一主机
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
//关键调用
promoteAndExecute();
}
复制代码
通过分析,我们知道,enqueue先把AsycCall加入到准备队列中,然后调用promoteAndExecute,其实看方法名我们也知道,这是执行代码
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//如果超出最大同步线程,当然这任务就老实呆在准备队列里
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//同理,超过单一主机连接数,继续遍历下一个asyncCall
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
//添加进executableCalls
executableCalls.add(asyncCall);
//添加到运行队列
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//执行线程任务
asyncCall.executeOn(executorService());
}
return isRunning;
}
复制代码
最后,我们看一下asyncCall.executeOn(executorService());这个代码里面的调用
//创建一个线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
复制代码
AsyncCall
传入进入的参数AsyncCall
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
...
}
复制代码
可以看到,AsyncCall是一个runnable类,由enqueue的源码分析我们可以知道,最后调用了asyncCall.executeOn
方法
void executeOn(ExecutorService executorService) {
...
try {
//执行线程池
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
...
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
复制代码
可以看到,线程池执行时,就会调用AsyncCall的run方法,我们去找它的run方法,发现实现在父类
public abstract class NamedRunnable implements Runnable {
...
@Override public final void run() {
...
execute();
...
}
...
}
复制代码
可以看到,就是调用了execute方法,这个方法在AsyncCall中有重写
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();//1
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
复制代码
可以看到,真正的Respone是通过getResponseWithInterceptorChain这个方法获取的,其他的代码只是控件与回调,这也是OkHttp的最精彩的设计Interceptor(拦截器)与Chain(调用链)
okhttp中execute同步执行
上面我们分析了enqueue异步执行的源码,其实同步执行的源码也差不多
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
//先调用Dispatcher有executed方法
client.dispatcher().executed(this);
//然后调用getResponseWithInterceptorChain方法
return getResponseWithInterceptorChain方法();
} finally {
client.dispatcher().finished(this);
}
}
复制代码
dispatcher的executed方法只是一个把任务添加到运行同步队列的方法
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
复制代码
OkHttp中的属性
这里对我们阅读源码没什么影响,不过可以扩展我们对网络的一些知识的了解。我们回到最开始,看OkHttpClient
的源码中的属性
final Dispatcher dispatcher;//线程调度器,负责异步请求的执行,会平衡性能,对同步操作只做统计
final @Nullable Proxy proxy;//可配置的代理,比如翻墙访问
final List<Protocol> protocols;//给服务端列出的所支持的协议的版本
final List<ConnectionSpec> connectionSpecs;//配置的是使用的Http还是Https,以及可支持的tls等
final List<Interceptor> interceptors;//拦截器
final List<Interceptor> networkInterceptors;//
final EventListener.Factory eventListenerFactory;//创建EventListener
final ProxySelector proxySelector;//设置代理的用户名密码
final CookieJar cookieJar;//cookie存储器
final @Nullable Cache cache;//缓存
final @Nullable InternalCache internalCache;//内部缓存接口
final SocketFactory socketFactory;//创建tcp连接端口
final SSLSocketFactory sslSocketFactory;//ssl的factory
final CertificateChainCleaner certificateChainCleaner;//整理证书链
final HostnameVerifier hostnameVerifier;//主机名验证器,给HTTPs用的
final CertificatePinner certificatePinner;//用来验证自定义证书的
final Authenticator proxyAuthenticator;//用于监听代理权限不足的回调
final Authenticator authenticator;//用于监听权限不足的回调
final ConnectionPool connectionPool;//连接池。带缓存的集合
final Dns dns;//用于设置dns,根据域名拿到ip列表
final boolean followSslRedirects;//http与https互跳时
final boolean followRedirects;//有重定向时要不要跳转
final boolean retryOnConnectionFailure;//请求失败是否重连
final int callTimeout;//call的超时时间
final int connectTimeout;//tcp超时时间
final int readTimeout;//下载超时时间
final int writeTimeout;//写入请求的超时时间
final int pingInterval;//心跳包ping的间隔
复制代码
小结
可以看到,无论是execute还是enqueue,都是调用了Disapatch的的对应execute与enqueue方法,真正的response都是通过getResponseWithInterceptorChain
来获取的,区别就是execute的执行是在dispatch中只做一个同步统计,而enqueue方法会在dispatch中根据当前并发数选择是否执行队列中等待的AsyncCall。
拦截器与调用链
上面分析到了,网络请求的入口实际上就是在RealCall的getResponseWithInterceptorChain
这个方法
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
//创建完整的拦截器集合
List<Interceptor> interceptors = new ArrayList<>();
//用户定义的一系列拦截器,如日志等
interceptors.addAll(client.interceptors());
//负责失败重试和重连的拦截器
interceptors.add(new RetryAndFollowUpInterceptor(client));
//把用户构造的请求转化为发送到服务器的请求,把服务器返回的结果转化为用户友好的结果
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//负责缓存策略的拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//负责与服务器连接的拦截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//负责向服务器发送数据和从服务器获取数据的拦截器
interceptors.add(new CallServerInterceptor(forWebSocket));
//把这个集合转换成链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,originalRequest, this, client.connectTimeoutMillis(),client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
//关键执行代码
Response response = chain.proceed(originalRequest);
...
return response;
} catch (IOException e) {
...
} finally {
...
}
}
复制代码
从源码中可以看出,它会先把所有的拦截器,转换成一条链RealInterceptorChain,再执行RealInterceptorChain中的proceed方法
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
...
// Call the next interceptor in the chain.
//从下一位开始的拦截器链
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
//调用对应拦截器的intercept方法
Response response = interceptor.intercept(next);
...
return response;
}
复制代码
而每一个interceptor.intercept的方法体里,都调用了realChain.proceed方法,这样chain与interceptor互相递归调用,直到世界(链)的尽头。
在这里其实是用到了责任链模式,比如,CacheInterceptor中
@Override
public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
...
Request networkRequest = strategy.networkRequest;
...
// If we don't need the network, we're done.
// 如果不需要网络直接可以返回缓存结果
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
...
}
复制代码
在源码中,如果可以直接取缓存里的数据时,是直接return的,不需要走后面的网络请求流程,这也是责任链模式所带来的便利性。
小结
使用了责任链模式,每个拦截器完成的大致流程是
- 先经过用户拦截器
- RetryAndFollowUpInterceptor负责自动重试和进行必要的重定向
- BridgeIntercetro负责将用户Request转换成一个实际的网络请求的Request,再调用下层的拦截器获取Response,最后再将网络Reponse转换成用户的Response
- CacheInterceptor负责控制缓存
- ConnectInterceptor负责进行连接主机
- 网络拦截器进行拦截
- CallServerInterceptor是真正的和服务器通信,完成Http请求。
可以看到,其实如果仔细看ConnectInterceptor,CallServerInterceptor等的源码,我们能发现,okhttp在内部自己完整的实现了一套TCP、TLS/SSL连接建立,HTTP的报文传输,各种http特性的支持(重试,跳转,cache,cookie等)。有兴趣的盆友可以自己跟着源码阅读一下,参考地址dieyidezui.com/okhttp-3-4-…
总结
这张图是blog.csdn.net/zhangqilugr…复制过来的,值得收藏,总结都不用写了,一图以蔽之
参考
下面是我的公众号,欢迎大家关注我
转载于:https://juejin.im/post/5d010926e51d45508c2fb832