坚持一下,把源码看完,勤奋一点,不要在懒惰了,你已经落下别人很多了
本文主要介绍Tomcat的连接器Connector
一、Connector的创建
当我们在server.xml中配置了Connector的时候,Digester通过解析xml文件,创建了Connector的对象。
在前文中提到的Catalina的createStartDigester方法中,Digester定义了一组规则,当解析xml遇到<Connector>标签的时候,创建Connector对象。
1、Catalina的createStartDigester:
digester.addRule("Server/Service/Connector", new ConnectorCreateRule());
digester.addRule("Server/Service/Connector", new SetAllPropertiesRule(new String[]{"executor"}));
digester.addSetNext("Server/Service/Connector", "addConnector", "org.apache.catalina.connector.Connector");
由于<Connecotr>标签是嵌套在<Service>中的,并且在server.xml中有多个Connector,所以 在StandardService中定义了一个Connector的数组用来存储这些连接器对象。
2、ConnectorCreateRule
public class ConnectorCreateRule extends Rule {
private static final Log log = LogFactory.getLog(ConnectorCreateRule.class);
public void begin(String namespace, String name, Attributes attributes)
throws Exception {
Service svc = (Service)digester.peek();
Executor ex = null;
if ( attributes.getValue("executor")!=null ) {
ex = svc.getExecutor(attributes.getValue("executor"));
}
Connector con = new Connector(attributes.getValue("protocol"));
if ( ex != null ) _setExecutor(con,ex);
digester.push(con);
}
public void _setExecutor(Connector con, Executor ex) throws Exception {
Method m = IntrospectionUtils.findMethod(con.getProtocolHandler().getClass(),"setExecutor",new Class[] {java.util.concurrent.Executor.class});
if (m!=null) {
m.invoke(con.getProtocolHandler(), new Object[] {ex});
}else {
log.warn("Connector ["+con+"] does not support external executors. Method setExecutor(java.util.concurrent.Executor) not found.");
}
}
@Override
public void end(String namespace, String name) throws Exception {
digester.pop();
}
}
该类继承了Rule,并且实现了自己的规则,在begain方法中,直接通过构造函数创建Connector对象。我们以常见的Connector配置为例 :
注意看红色标注的部分,首先配置了一个Executor,并且在下面的Connector中通过name 引入了这个executor,
再来看下Connector封装的属性和部分方法
3、Connector的构造方法和setProtocol方法
public Connector() {
this(null);
}
public Connector(String protocol) {
setProtocol(protocol);
// Instantiate protocol handler
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
this.protocolHandler = (ProtocolHandler) clazz.newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
}
}
public void setProtocol(String protocol) {
if (AprLifecycleListener.isAprAvailable()) {
if ("HTTP/1.1".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11AprProtocol");
} else if ("AJP/1.3".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.ajp.AjpAprProtocol");
} else if (protocol != null) {
setProtocolHandlerClassName(protocol);
} else {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11AprProtocol");
}
} else {
if ("HTTP/1.1".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11Protocol");
} else if ("AJP/1.3".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.ajp.AjpProtocol");
} else if (protocol != null) {
setProtocolHandlerClassName(protocol);
}
}
}
在Connector 中定义了一个
protected ProtocolHandler protocolHandler = null;
protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11Protocol";
那么现在就可以确定,在Digester解析server.xml的时候,遇到Connector标签则会创建相应的Connector对象,根据配置的protocol协议指定默认的协议处理类;
Tomcat提供了两种 协议的处理类,一种是Ajp,一种是Http,
每种协议提供了三种不同的实现方式,分别是Apr,Nio,和Jio。
JIO(java.io):用java.io纯JAVA编写的TCP模块,这是tomcat默认连接器实现方法;
APR(Apache Portable Runtime):有C语言和JAVA两种语言实现,连接Apache httpd Web服务器的类库是在C中实现的,同时用APR进行网络通信;
NIO(java.nio):这是用纯Java编写的连接器(Conector)的一种可选方法。该实现用java.nio核心Java网络类以提供非阻塞的TCP包特性。
如果Connector配置了executor属性,则会把executor 与Connector持有 的ProtocolHandler 实现类 进行绑定,通过实现类的setExecutor方法将两者绑定。
二、Connector的初始化 init
1、大体流程:
2、Coonector的init方法的代码
protected void initInternal() throws LifecycleException {
super.initInternal();
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// Make sure parseBodyMethodsSet has a default
if( null == parseBodyMethodsSet ) {
setParseBodyMethods(getParseBodyMethods());
}
if (protocolHandler.isAprRequired() &&
!AprLifecycleListener.isAprAvailable()) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
try {
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException
(sm.getString
("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
// Initialize mapper listener
mapperListener.init();
}
最终发现Connector的初始化 是调用了protocolHandler.init 和 mapperListener.init方法
3、AbstractProtocol .init方法
@Override
public void init() throws Exception {
if (getLog().isInfoEnabled())
getLog().info(sm.getString("abstractProtocolHandler.init",
getName()));
if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if (oname != null) {
Registry.getRegistry(null, null).registerComponent(this, oname,
null);
}
}
if (this.domain != null) {
try {
tpOname = new ObjectName(domain + ":" +
"type=ThreadPool,name=" + getName());
Registry.getRegistry(null, null).registerComponent(endpoint,
tpOname, null);
} catch (Exception e) {
getLog().error(sm.getString(
"abstractProtocolHandler.mbeanRegistrationFailed",
tpOname, getName()), e);
}
rgOname=new ObjectName(domain +
":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null, null).registerComponent(
getHandler().getGlobal(), rgOname, null );
}
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
try {
endpoint.init();
} catch (Exception ex) {
getLog().error(sm.getString("abstractProtocolHandler.initError",
getName()), ex);
throw ex;
}
}
以上代码最终的是 调用了endpoint的init方法。
/**
* Endpoint that provides low-level network I/O - must be matched to the
* ProtocolHandler implementation (ProtocolHandler using BIO, requires BIO
* Endpoint etc.).
*/
protected AbstractEndpoint<S> endpoint = null;
通过注释可以发现,Endpoint与ProtocolHandler 是相匹配的。
public Http11Protocol() {
endpoint = new JIoEndpoint();
cHandler = new Http11ConnectionHandler(this);
((JIoEndpoint) endpoint).setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
那么 最终Connector的初始化会跳转到 JIoEndpoint的init方法中。
4.JioEndpoint的init方法
JIoEndpoint是抽象类AbstractEndpoint的实现类,在AbstractEndpoint 中的init方法如下: public final void init() throws Exception {
testServerCipherSuitesOrderSupport();
if (bindOnInit) {
bind();
bindState = BindState.BOUND_ON_INIT;
}
}
该方法依赖于子类实现的抽象方法bind(), JIoEndpoint的bind方法如下:
public void bind() throws Exception {
// Initialize thread count defaults for acceptor
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;//创建一个acceptor线程,专门用来接收socket请求
}
// Initialize maxConnections
if (getMaxConnections() == 0) {
// User hasn't set a value - use the default
setMaxConnections(getMaxThreadsInternal());
}
if (serverSocketFactory == null) {
if (isSSLEnabled()) {
serverSocketFactory =
handler.getSslImplementation().getServerSocketFactory(this);
} else {
serverSocketFactory = new DefaultServerSocketFactory(this);
}
}
if (serverSocket == null) {
try {
if (getAddress() == null) {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog());
} else {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog(), getAddress());
}
} catch (BindException orig) {
******
}
}
}
上面代码最重要的部分是创建了一个ServerSocket,传入的port,backlog,addr 参数 则是 在Digester解析server.xml的时候 从<Connector>标签中解析出来的,
digester.addRule("Server/Service/Connector", new ConnectorCreateRule());
digester.addRule("Server/Service/Connector", new SetAllPropertiesRule(new String[]{"executor"}));
当创建完毕Connector对象之后,则会对xml文件中配置的Connector标签的属性进行赋值。所以在Connector调用init方法的时候这几个属性已经有数据了。
三、Connector的start
1、大体流程
2、Connector.startInternal方法
@Override
protected void startInternal() throws LifecycleException {
// Validate settings before starting
if (getPort() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPort())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start();
} catch (Exception e) {
String errPrefix = "";
if(this.service != null) {
errPrefix += "service.getName(): \"" + this.service.getName() + "\"; ";
}
throw new LifecycleException
(errPrefix + " " + sm.getString
("coyoteConnector.protocolHandlerStartFailed"), e);
}
mapperListener.start();
}
protocolHandler.start在抽象类AbstractProtocol 中有实现,最终会调用endpoint的start方法
3、JIoEndpoint.startInternal方法
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();
startAcceptorThreads();
// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
重点分析startAcceptorThreads方法,该方法启动一个线程接收客户端的请求,接收到请求之后会把请求放在线程池里并启动.
4、AbstractProtocol.startAcceptorThreads()方法解析
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();//在JIoEndpoint的bind方法中 设置了acceptorThreadCount=1
acceptors = new Acceptor[count];//所以只有一个Acceptor,和前面JIoEndpoint的bing方法里设置的acceptorThreadCount对应,这里是1个
for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);//启动接收器线程来接收socket请求
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
在AbstractProtocol中,定义了一个抽象类Acceptor
public abstract static class Acceptor implements Runnable {
public enum AcceptorState {
NEW, RUNNING, PAUSED, ENDED
}
protected volatile AcceptorState state = AcceptorState.NEW;
public final AcceptorState getState() {
return state;
}
private String threadName;
protected final void setThreadName(final String threadName) {
this.threadName = threadName;
}
protected final String getThreadName() {
return threadName;
}
}
该Acceptor并没有实现Runnable的run方法,我们查看JIoEndpoint可以发现,在JIoEndpoint中,定义了一个内部类Acceptor来专门实现run方法
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
Socket socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSocketFactory.acceptSocket(serverSocket);
} catch (IOException ioe) {
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused && setSocketOptions(socket)) {//给Socket设置一些属性值
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) { // 该方法解析socket请求。
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} else {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} catch (IOException x) {
} catch (NullPointerException npe) {
} catch (Throwable t) {
}
}
state = AcceptorState.ENDED;
}
}
5、JIoEndpoint.processSocket 方法
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
在该方法中,将客户端请求Socket进一步包装成SocketWrapper的对象,最终通过调用getExecutor获取到线程池,让线程池来执行 SocketProcessor。
digester.addObjectCreate("Server/Service/Executor",
"org.apache.catalina.core.StandardThreadExecutor",
"className");
当我们在解析server.xml中的connector标签的时候,如果标签中有executor的属性,通过查看上面的代码 ConnectorCreateRule 则可以确定会调用Connector的protocolHandler的setExecutor方法,那么接下来查看HttpProtocol父类AbstractProtocol 的setExecutor方法:
public void setExecutor(Executor executor) {
endpoint.setExecutor(executor);
}
再来看一下JIoEndpoint定义的内部类SocketProcessor。
6、SocketProcessor.run
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), t);
}
// Tell to close the socket
state = SocketState.CLOSED;
}
if ((state != SocketState.CLOSED)) {
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN_READ);
} else {
state = handler.process(socket,status);
}
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.OPEN ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT ||
state == SocketState.UPGRADED){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
} catch (RejectedExecutionException x) {
log.warn("Socket reprocessing request was rejected for:"+socket,x);
try {
//unable to handle connection at this time
handler.process(socket, SocketStatus.DISCONNECT);
} finally {
countDownConnection();
}
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
}
}
socket = null;
// Finish up this request
}
上面的代码有这么一行:state = handler.process(socket,status);
public Http11Protocol() {
endpoint = new JIoEndpoint();
cHandler = new Http11ConnectionHandler(this);
((JIoEndpoint) endpoint).setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
Http11ConnectionHadnler 继承自AbstractProtocol的内部类AbstractConnectionHandler,AbstractConnectionHandler的process方法如下:篇幅原因,删掉了一些代码
7、AbstractConnectionHandler的process方法
@SuppressWarnings("deprecation") // Old HTTP upgrade method has been deprecated
public SocketState process(SocketWrapper<S> wrapper,
SocketStatus status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
if (socket == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
Processor<S> processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
ContainerThreadMarker.markAsContainerThread();
try {
if (processor == null) {
processor = recycledProcessors.poll();
}
if (processor == null) {
processor = createProcessor();//该方法在AbstractConnectionHandler声明,但是在Http11ConnectionHadnler中实现,返回Http11Processor
}
initSsl(wrapper, processor);//ssl的一些处理
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
} else if (processor.isAsync() || state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
if (state == SocketState.OPEN) {
getProtocol().endpoint.removeWaitingRequest(wrapper);
state = processor.process(wrapper);
}
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.getUpgradeInbound() != null) {
state = processor.upgradeDispatch();
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch(status);
} else {
state = processor.process(wrapper);//调用Http11Processor的process方法
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
if (state == SocketState.UPGRADING) {
HttpUpgradeHandler httpUpgradeHandler =processor.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
httpUpgradeHandler.init((WebConnection) processor);
} else if (state == SocketState.UPGRADING_TOMCAT) {
org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
processor.getUpgradeInbound();
release(wrapper, processor, false, false);
processor = createUpgradeProcessor(wrapper, inbound);
inbound.onUpgradeComplete();
}
if (getLog().isDebugEnabled()) {
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT);
if (state == SocketState.LONG) {
connections.put(socket, processor);
longPoll(wrapper, processor);
} else if (state == SocketState.OPEN) {
connections.remove(socket);
release(wrapper, processor, false, true);
} else if (state == SocketState.SENDFILE) {
connections.put(socket, processor);
} else if (state == SocketState.UPGRADED) {
connections.put(socket, processor);
if (status != SocketStatus.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else {
connections.remove(socket);
if (processor.isUpgrade()) {
processor.getHttpUpgradeHandler().destroy();
} else if (processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor) {
// NO-OP
} else {
release(wrapper, processor, true, false);
}
}
return state;
} catch(java.net.SocketException e) {
}
connections.remove(socket);
if (!(processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor)
&& !processor.isUpgrade()) {
release(wrapper, processor, true, false);
}
return SocketState.CLOSED;
}
8、AbstractHttp11Processor.process
@Override
public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint);
getOutputBuffer().init(socketWrapper, endpoint);
// Flags
keepAlive = true;
comet = false;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
if (endpoint.getUsePolling()) {
keptAlive = false;
} else {
keptAlive = socketWrapper.isKeptAlive();
}
if (disableKeepAlive()) {
socketWrapper.setKeepAliveLeft(0);
}
while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
upgradeInbound == null &&
httpUpgradeHandler == null && !endpoint.isPaused()) {
// Parsing the request header
try {
setRequestLineReadTimeout();
if (!getInputBuffer().parseRequestLine(keptAlive)) {
if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
request.getCookies().setLimit(getMaxCookieCount());
// Currently only NIO will ever return false here
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
if (getLog().isDebugEnabled()) {
getLog().debug(
sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString(
"http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString(
"http11processor.fallToDebug");
//$FALL-THROUGH$
case INFO:
getLog().info(message, t);
break;
case DEBUG:
getLog().debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();//解析请求头
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
adapter.service(request, response);//调用Adapter的service方法,最终会进入到我们后台的业务代码中。
if(keepAlive && !getErrorState().isError() && (
response.getErrorException() != null ||
(!isAsync() &&
statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
setCometTimeouts(socketWrapper);
} catch (InterruptedIOException e) {
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync() && !comet) {
if (getErrorState().isError()) {
getInputBuffer().setSwallowInput(false);
} else {
checkExpectationAndResponseStatus();
}
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();
if (!isAsync() && !comet || getErrorState().isError()) {
if (getErrorState().isIoAllowed()) {
getInputBuffer().nextRequest();
getOutputBuffer().nextRequest();
}
}
if (!disableUploadTimeout) {
if(endpoint.getSoTimeout() > 0) {
setSocketTimeout(endpoint.getSoTimeout());
} else {
setSocketTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync() || comet) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else if (getUpgradeInbound() != null) {
return SocketState.UPGRADING_TOMCAT;
} else {
if (sendfileInProgress) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
观察上面的代码,调用adapter.service(request, response); 方法的时候,吧封装好的request和response传进去。
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
protocolHandler 就是Http11Protocol 类的对象。
@Override
protected Http11Processor createProcessor() {
Http11Processor processor = new Http11Processor(
proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint,
proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(),
proto.getMaxExtensionSize(), proto.getMaxSwallowSize());
processor.setAdapter(proto.adapter);
processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
processor.setConnectionUploadTimeout(
proto.getConnectionUploadTimeout());
processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
processor.setCompressionMinSize(proto.getCompressionMinSize());
processor.setCompression(proto.getCompression());
processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
processor.setSocketBuffer(proto.getSocketBuffer());
processor.setMaxSavePostSize(proto.getMaxSavePostSize());
processor.setServer(proto.getServer());
processor.setDisableKeepAlivePercentage(
proto.getDisableKeepAlivePercentage());
processor.setMaxCookieCount(proto.getMaxCookieCount());
register(processor);
return processor;
}
结论:Connector主要分两部分工作,一个是解析客户端请求,另一个则是调用后台业务service方法。