前面已经详细介绍了 Tomcat 的线程模型, 那么对于一个请求, Tomcat 到底是如何处理的呢 ?
Acceptor
由 NioEndpoint 的内部类 Acceptor 监听连接
// Acceptor 实现类 Runable 接口, 直接查看 run 方法
int errorDelay = 0;
// 一直循环, 直到收到 "shutdown" 命令
while (running) {
// 当 Endpoint 暂停时, 自旋
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
}
catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
// 判断连接数是否大于阈值 (默认为 10000), 否则将会休眠
// 里面用到了 AQS
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// 阻塞, 直到有连接
// bind 方法中已经设置为 阻塞
socket = serverSock.accept();
}
....
// Configure the socket
if (running && !paused) {
// 将 Socket 交给 Poller 线程处理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
}
....
}
------------------------
// 查看 setSocketOptions 方法
protected boolean setSocketOptions(SocketChannel socket)
try {
// 设置为非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
// 从 nioChannels 缓存队列中获取,并在使用完毕后放回该缓存队列
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
// 如果使用 SSL 证书, 即 HTTPS 协议
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
}
else {
// 将 Socket 封装成 NioChannel 对象
channel = new NioChannel(socket, bufhandler);
}
}
else {
channel.setIOChannel(socket);
channel.reset();
}
// 使用轮转法, 选择一个 Poller 对象
// 继续封装, NioChannel 封装成为 PollerEvent 对象
getPoller0().register(channel);
}
Poller
可以看到, 调用了 Poller 的 registry 方法, 传了一个 NioChannel 对象进来
public void register(final NioChannel socket) {
// 将此 NioChannel 与 Poller 对象关联
socket.setPoller(this);
// 将此 NioChannel 包装一下, 这个包装对象会与 PollerEvent , SelectionKey 对象绑定
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
// 同样从缓存中拿
PollerEvent r = eventCache.pop();
// 包装对象绑定 read 事件, 这应该是 registry 事件转换过来的
// 这在处理 SelectionKey, 判断事件的时候起效
ka.interestOps(SelectionKey.OP_READ);
if (r == null){
// 将 NioChannel 封装成 PollerEvent 对象
r = new PollerEvent(socket, ka, OP_REGISTER);
}
else {
r.reset(socket, ka, OP_REGISTER);
}
// 将封装好的 PollerEvent 放入 events 队列
addEvent(r);
}
Poller 类也实现了 Runnable 接口, 查看他的 run 方法
// 一直循环
while (true) {
boolean hasEvents = false;
try {
if (!close) {
// 检查 events 事件队列是否存在元素
// 然后遍历 events 事件队列的所有元素, 将 Socket 以 OP_READ 事件注册到 Selector 上
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
}
else {
// Selector 监听读写事件, 返回有事件发生的个数, 设置超时时间 1 s
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
....
//either we timed out or we woke up, process events first
if (keyCount == 0)
hasEvents = (hasEvents | events());
// 遍历所有 (存在事件的) SelectionKey
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
// 拿到 SelectionKey 的附件 (在注册的时候就绑定好了)
// 这应该是一个 Socket 的包装对象 NioSocketWrapper
NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
if (attachment == null) {
iterator.remove();
}
else {
iterator.remove();
// 处理该事件
processKey(sk, attachment);
}
}
// 超时退出
timeout(keyCount, hasEvents);
}
getStopLatch().countDown();
然后判断事件的类型, 将包装对象 socketWrapper 封装成 SocketProcessorBase (实现了 Runnable 接口) 对象, 交给线程池执行
Processor
请求来到 SocketProcessor , 这仍然是 NioEndpoint 的内部类, 只是获取了以下 Handler 对象, 然后交给 ConnectionHandler 继续处理, 查看它的 process 方法
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
.....
S socket = wrapper.getSocket();
// 从缓存中拿 Processor, 拿不到就创建一个
Processor processor = connections.get(socket);
...
try {
...
if (processor == null) {
// 先从缓存中拿 (已回收的 Processor)
processor = recycledProcessors.pop();
...
}
if (processor == null) {
// 根据配置的 Connector 协议, 默认创建 Http11Process 处理器
processor = getProtocol().createProcessor();
// 注册该 Http11Process
register(processor);
...
}
...
// 放入缓存, 关联这个 Processor 和 连接
connections.put(socket, processor);
SocketState state = SocketState.CLOSED;
do {
// 处理器对这个 SocketWrapperBase 进行处理
state = processor.process(wrapper, status);
.......
}
Http11Processor 继续处理, Http11Processor 对请求头进行处理, 并将 URI 路径携带的数据放入 inputBuffer 中, 并将数据封装在 org.apache.coyote.Request
, 和 org.apache.coyote.Response
对象中, 然后将这个两个对象传给 CoyoteAdapter 进一步处理
CoyoteAdapter
其实这就是一个适配器, 将 org.apache.coyote.Request
转换为 org.apache.catalina.connector.Request
, 将 org.apache.coyote.Response
转换为 org.apache.catalina.connector.Response
(适配器模式), 查看它的 service 方法
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// 通过 Connector 创建 Request , Response 对象
request = connector.createRequest();
// 关联 req 和 request, request 对象包含了一个 req 对象
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);// 同上
// 相互链接, 一对一的关系
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// 设置请求 URI 的编码
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// 解析并设置 Catalina, 获取请求的参数
// 解析 req 请求, Mapper 与 URI 请求匹配, 找到对应的 Servlet, 放入 request 中
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
// 检查 Valve 是否支持异步
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// 调用容器, 责任链模式, 打开 Engine 的 Pipeline (管道)
// 调用第一个 Valve (阀门) 的 invoke 方法
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
....
}
那么, Mapper 如何与 req 请求的 URI 映射的呢? 直接查看 postParseRequest(req, request, res, response) 方法
protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
org.apache.coyote.Response res, Response response) throws IOException, ServletException {
if (req.scheme().isNull()) {
// 默认为 "http" 和 false
req.scheme().setString(connector.getScheme());
request.setSecure(connector.getSecure());
}
else {
request.setSecure(req.scheme().equals("https"));
}
// 代理
String proxyName = connector.getProxyName();
int proxyPort = connector.getProxyPort();
if (proxyPort != 0) {
req.setServerPort(proxyPort);
}
else if (req.getServerPort() == -1) {
// 如果是 HTTPS 请求, 转发到 443 端口
if (req.scheme().equals("https")) {
req.setServerPort(443);
}
else {
req.setServerPort(80);
}
}
if (proxyName != null) {
req.serverName().setString(proxyName);
}
// 未解码的 URI
MessageBytes undecodedURI = req.requestURI();
// URI 通配符匹配
if (undecodedURI.equals("*")) {
....
}
// 已解码的 URI , 应该为 null
MessageBytes decodedURI = req.decodedURI();
// 下面就是解码 URI
if (undecodedURI.getType() == MessageBytes.T_BYTES) {
....
}
// 找到 Context 对应的版本 ContextVersion
String version = null;
Context versionContext = null;
boolean mapRequired = true;
...
while (mapRequired) {
// 先拿到 Mapper 对象
// 这里就会对 URI 映射, 找到 Host, Context, Warpper 保存在 MappingData 中
// 而 MappingData 对象保存在 Request 对象中
connector.getService().getMapper().map(serverName, decodedURI,
version, request.getMappingData());
...
String sessionID;
if (request.getServletContext().getEffectiveSessionTrackingModes()
.contains(SessionTrackingMode.URL)) {
// 从请求中获取 sessionId (如果有的话)
sessionID = request.getPathParameter(
SessionConfig.getSessionUriParamName(
request.getContext()));
if (sessionID != null) {
request.setRequestedSessionId(sessionID);
request.setRequestedSessionURL(true);
}
}
// 在 Cookie 和 SSL 会话中查找 session ID
try {
parseSessionCookiesId(request);
}
catch (IllegalArgumentException e) {
// Too many cookies
if (!response.isError()) {
response.setError();
response.sendError(400);
}
return true;
}
parseSessionSslId(request);
sessionID = request.getRequestedSessionId();
mapRequired = false;
// 如果没有指定 Context 版本
if (version != null && request.getContext() == versionContext) {
// We got the version that we asked for. That is it.
}
else {
version = null;
versionContext = null;
Context[] contexts = request.getMappingData().contexts;
// 单个 contextVersion 不需要映射
// 没有 session ID 意味着没有重新映射的可能性
if (contexts != null && sessionID != null) {
...
}
}
...
}
// 有可能重定向
MessageBytes redirectPathMB = request.getMappingData().redirectPath;
....
}
现在, 已经找到映射, 然后依次交给他们处理, Tomcat 使用的是责任链模式
, 先执行 Engine 的 Pipeline 的第一个 EnginetValve (管道打开阀门, 从头到尾) 的 invoke 方法, 直到 “打开” 完所有的 EnginetValve 后, 交给 Host 的 Pipeline , 同样执行第一个 HostValve 的 invoke 方法的, 处理流程和 Engine 一样 … , 然后是 Context …, Wrapper … 直接查看 WrapperValve 的 invoke 方法
public final void invoke(Request request, Response response)
throws IOException, ServletException {
...
// 终于看到了 Servlet 对象
Servlet servlet = null;
Context context = (Context) wrapper.getParent();
....
try {
if (!unavailable) {
// 获取到具体的 Servlet
servlet = wrapper.allocate();
}
}
....
// 创建 Filter 链
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
try {
if ((servlet != null) && (filterChain != null)) {
if (context.getSwallowOutput()) {
try {
SystemLogHandler.startCapture();
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
}
else {
// 调用各个 Filter, 最后调用 Servlet
// 最终将 Request, Response 对象转成 ServletRequest, ServletResponse 对象
filterChain.doFilter(request.getRequest(),
response.getResponse());
}
}
finally {
String log = SystemLogHandler.stopCapture();
if (log != null && log.length() > 0) {
context.getLogger().info(log);
}
}
}
....
}
执行完所有 Filter 的 doFilter 方法后, 最后调用了 Servlet 的 service 方法, 到了这里, Servlet 终于开始干活了, 一次完整的请求结束, 至于 Tomcat 是如何响应的, 也就是重定向和请求转发, 还有热部署是如何实现的, 下篇继续