文章目录
上下文
tomcat源码分析共2个阶段
1.tomcat如何接收处理请求 源码分析(一)(Connector
部分)本篇
2.tomcat如何接收处理请求 源码分析(二)
(Container
部分)
目标:
本文主要分析tomcat的Connector
部分,一个请求如何封装为HttpRequest
。主要包括NioEndpoint
,Acceptor
,Poller
,PollerEvent
,selector
,SocketProcessor
的一些细节。
下图的第一个框即是本文所研究的核心内容。第二个框和第三个框分别会在后面的文章描述。
正文
我们使用tomcat的时候都知道它是多线程的,很多个用户同时访问的时候,他能够将每个应用分发给不同的线程,处理好之后将结果返回给浏览器。但是由于一个进程能打开的线程数量有限,如果使用BIO方式(一个请求对应一个线程)处理这些客户端发起的连接,妨碍了系统吞吐力的提升,所以现在的tomcat都采用了NIO的方式来提高系统的并发能力。
BIO方式的B/S模型如上,服务器S作为一个请求接待者,凡是接收到一个socket连接请求,就会立即给这个sokect分配一个后台线程,这个后台线程会负责执行完整个请求的生命周期。那么这样有什么问题吗?为啥说这个方式影响并发呢?我的理解是,当浏览器发起连接的时候,可能并不会马上发送数据,而是等着用户输入。用户什么时候输入完成我的后端线程拿到数据才会进入逻辑执行阶段(比如查表等操作)。很明显这种情况没有充分的利用线程。要知道线程是十分宝贵的资源,不仅会占用一定的内存,同时也占用了cpu资源。
上面简单描述了NIO
的情况,在poller
线程中会维护一个selector
对象,这个对象非常厉害,我们Acceptor
收到的socket
会关联到selector
对象中,一个selector
对象可以关联n个socket
。只有当里面任意一个socket
有事情发生(比如数据写入完毕),poller
才会指定一个socketProcessor
线程来处理这个socket
。按照以前的方式BIO
方式,我们1000个浏览器发起请求,尽管可能真正需要立即使用线程去执行的就1个,但是我们确需要开1000个线程去维护这1000个没啥事的socket
。现在换成NIO
之后情况大大的改善。还是1000个请求,在socket
和浏览器建立连接之后,这些socket不会直接关联线程,而是先注册到poller
的selector
对象中,假如1000个连接的socket
有10个socket
的数据已经写入完毕,那么poller
会去socketProcessor
的线程池中挑选10个来执行业务。从而整个系统的线程数量大大减少,大大的提高了系统的并发性。
接下来我们就围绕上面说的这些进一步到代码中找相关佐证。
1 NioEndpoint#Acceptor
接收socket
/**
* 这个线程负责监听TCP/IP连接
* 并将其转交给合适的处理器
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// 接收到socket
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
//setSocketOptions 内部会将socket转交给合适的处理器
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
以上代码核心就两句
socket = serverSock.accept();//接收到socket
setSocketOptions(socket);//将socket转交给其他处理器
2 NioEndpoint#setSocketOptions
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
NioChannel channel = nioChannels.pop();
if (channel == null) {
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
getPoller0().register(channel);
} catch (Throwable t) {
}
return true;
}
上面这么长的方法核心就下面两句
channel = new NioChannel(socket, bufhandler);//socket封装成NioChannel
getPoller0().register(channel);//将NioChannel注册到Poller中。
接下来就是看这个注册到Poller
的过程
3 NioEndpoint#Poller#register
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
r = new PollerEvent(socket,ka,OP_REGISTER);
addEvent(r);
核心逻辑是把NioChannel
封装到PollerEvent
中,并且放入evens
事件队列中,供其他线程消费。
前面分析了三个方法了,其实就是完成了PollerEvent
的生产过程。此时这些事件还未消费。我们现在要看poller是怎么消费这些PollerEvent
的。
4Poller
消费socket
事件处理socket
/*
* appropriate processor as events occur.
* 当有事件发生的时候,将事件交给合适的处理器
*/
@Override
public void run() {
while (true) {
boolean hasEvents = false;
try {
if (close) {
events();
}
} catch (Throwable x) {
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
以上这么长的逻辑,核心就下面两句
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;//获取到发生了事件的socket
processKey(sk, attachment);//并将socket交给socketProcessor线程处理。
我们下面的代码块会定位到最终的socket
处理类
// NioEndpoint # Poller #processKey()
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//重点!!!
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
}
}
}
/////////////////////////////分割
# AbstractEndpoint # processSocket()
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
//重点
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}
return true;
}
/////////////////分割
//NioEndpoint createSocketProcessor()
@Override
protected SocketProcessorBase<NioChannel> createSocketProcessor(
SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
以上这么长的就三句被标记的是核心哈,最后是委派给了SocketProcessor
对发生了事件的socket进行相关处理。
这里SocketProcessor
是一个runnable
接口,有run
方法并且是放在一个线程池中,所以我们就直接分析
// NioEndpoint#SocketProcessor#doRun()
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
//这里才是重点。转交给了AbstractProtocol#ConnectionHandler#process
//什么时候 SocketProcessor 注入了 ConnectionHandler呢?
//在tomcat容器启动,新建connector对象的时候就会初始化
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
}catch (Throwable t) {
} finally {
if (running && !paused) {
processorCache.push(this);
}
}
}
}
回顾第四点,(Poller
消费事件) 其实就是poller对象通过内部的selector
对象监听socket对象,一旦socket
对象发生了一些poller
感兴趣的事件(比如数据准备完成)那么selector
能感知到,然后启用SocketProcessor
线程对其处理。socketProcessor
内部又依赖了.Http11NioProtocol
类的一些功能。其实就是对http报文进行解析。把请求的一些参数进行封装,最后得到相应请求对象比如request
,response
对象。
上面的我们提到的state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
这个handler
实际是Http11NioProtocol
的父类AbstractProtocol
的子类ConnectionHandler
。我们截个图
那么在ConnectionHandler
执行了哪些操作呢?代码比较长,我就只关心最核心的了
state = processor.process(wrapper, status);
# AbstractProcessorLight 对scoket进行一些处理
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
//核心代码在这
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
5Http11Processor#service()
处理http协议解析数据包
重点方法来了,将http包封装为Request对象,征对请求头的内容对请求进行验证等等
@Override
#Http11Processor#service()
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !endpoint.isPaused()) {
//
try {
if (!inputBuffer.parseRequestLine(keptAlive)) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
//读取请求头对象
if (!inputBuffer.parseHeaders()) {
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
socketWrapper.setReadTimeout(connectionUploadTimeout);
}
}
}
if (!getErrorState().isError()) {
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
// Process the request in the adapter
//往servlet转发处理之后的业务逻辑
getAdapter().service(request, response);
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
sendfileState = processSendfile(socketWrapper);
}
}
##6CoyoteAdapter#service
转发给Container
进入下一步处理
接下来就是本篇的最后一个方法了
# CoyoteAdapter#service方法
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
// 根据配置设置一些参数
postParseSuccess = postParseRequest(req, request, res, response);
// 调用 container ,connector部分的逻辑执行完毕
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
}
总结
最后对本文对于connector的描述做一个小结,总结内容来自how tomcat work