init
AbstractProtocol#init
public void init() throws Exception {
//。。。。
// AbstractProtocol#init 的init 最后调用的是 endpoint.init();
//在非阻塞io模式下面 endpoint 就是 NioEndpoint
endpoint.init();
}
复制代码
AbstractEndpoint#init
public final void init() throws Exception {
if (bindOnInit) {
//调用绑定的方法
bindWithCleanup();
bindState = BindState.BOUND_ON_INIT;
}
//。。。。。。
}
复制代码
AbstractEndpoint#bindWithCleanup
private void bindWithCleanup() throws Exception {
try {
//调用子类绑定方法
bind();
} catch (Throwable t) {
//。。。。
}
}
复制代码
NioEndpoint#bind
public void bind() throws Exception {
//socket 绑定
initServerSocket();
// acceptor 线程数量 默认 1
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
//poller 线程数量 默认2 ,跟cpu核心取最小
if (pollerThreadCount <= 0) {
pollerThreadCount = 1;
}
//。。。。
}
复制代码
NioEndpoint#initServerSocket
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
//服务端绑定端口号的一般写法
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} //......
//设置 servicesock 为阻塞io
serverSock.configureBlocking(true); //mimic APR behavior
}
复制代码
以上就是Tomcat初始化绑定端口的逻辑
start
AbstractProtocol#start
public void start() throws Exception {
//....
//调用 endpoint#start方法
endpoint.start();
//....
}
复制代码
AbstractEndpoint#start
public final void start() throws Exception {
//已经绑定过了,不走这个逻辑
if (bindState == BindState.UNBOUND) {
bindWithCleanup();
bindState = BindState.BOUND_ON_START;
}
//调用子类
startInternal();
}
复制代码
NioEndpoint#startInternal
public void startInternal() throws Exception {
if (!running) {
//.....
// 创建工作线程,也就是处理业务逻辑
if ( getExecutor() == null ) {
createExecutor();
}
//新建 poller 在init的时候有指定线程数为2 后面可以看下poller的功能
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
//启动
pollerThread.start();
}
//创建 acceptor
startAcceptorThreads();
}
}
复制代码
AbstractEndpoint#startAcceptorThreads
protected final void startAcceptorThreads() {
//默认1
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
// 创建 acceptor
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
复制代码
AbstractEndpoint#createExecutor
public void createExecutor() {
internalExecutor = true;
//自定义线程队列
TaskQueue taskqueue = new TaskQueue();
//TaskThreadFactory 这个线程池不是java原生,而是Tomcat继承java原生线程池,做了一点改动
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
复制代码
以上就创建了 工作线程、acceptor 线程(1个)、poller 线程(2个),后面看下 acceptor、poller是干什么的
Acceptor
//继承 Runable 正常最重要的方法就是run方法
public class Acceptor<U> implements Runnable {
复制代码
Acceptor#run
public void run() {
while (endpoint.isRunning()) {
//.....
try {
try {
// 执行 serviceSocket的accept方法
//一般java io 请求都是先绑定端口号,然后等待连接
//通过accept方法得到连接的socket,然后进行独写操作
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
//....
}
```
//获得到连接之后,处理请求
if (endpoint.isRunning() && !endpoint.isPaused()) {
//处理请求
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
复制代码
NioEndpoint#serverSocketAccept
protected SocketChannel serverSocketAccept() throws Exception {
//调用java自带的accept
return serverSock.accept();
}
复制代码
NioEndpoint#setSocketOptions
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//设置 socket 为非阻塞,跟刚开始的init不一样,serviceSocket 是阻塞的
//socket 是非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
//Synchronizedstack 是底层为数组,做了一个对象缓存
//这样就不需要频繁创建对象,用完放到数组,需要的时候把对象
//里面的属性设置为null
NioChannel channel = nioChannels.pop();
if (channel == null) {
//读写buffer
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
//包装一下 channel
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//往poller注册channel
getPoller0().register(channel);
} catch (Throwable t) {
//....
}
return true;
}
复制代码
到这里可以看出来Acceptor的作用就是accept,不停的run,然后接收到一个请求,就把这个请求通道包装一下,放到Poller进行处理。同时Acceptor是阻塞的
Poller
//poller 同样是一个线程,看run方法
public class Poller implements Runnable {
复制代码
看run方法之前先看看 Acceptor#run调用Poller#register方法 NioEndpoint.Poller#register
public void register(final NioChannel socket) {
//对socket进行各种包装
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
//eventCache 同样是一个对象缓存器
PollerEvent r = eventCache.pop();
//SelectionKey.OP_READ 读事件
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
//把 PollerEvent 放到一个 数组里面
addEvent(r);
}
复制代码
NioEndpoint.Poller#addEvent
private void addEvent(PollerEvent event) {
//放到数组
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}
复制代码
到这个,register 做的事情 1、包装一下socket 放到 PollerEvent 然后把pollerEvent放到数组,接下来可以看run方法 NioEndpoint.Poller#run
public void run() {
//死循环跑
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//先调用 events方法,这个方法重要,后面看
hasEvents = events();
// 选择器 select 看下有没有读写操作
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
//.....
}
//拿到key
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
//有读写操作,进行独写
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
复制代码
NioEndpoint.Poller#events
public boolean events() {
boolean result = false;
//刚才 regist是往数组放socket
//events.poll() 现在是把socket从events取
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
//直接调用run方法,注意不是开启线程,直接调用方法
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
复制代码
NioEndpoint.PollerEvent#run
public void run() {
if (interestOps == OP_REGISTER) {
try {
//往selector选择器注册 channel 事件是read
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
}
} else
//.......
}
}
复制代码
所以envent做到事情是从数组里面拿到pollerevent,然后注册read事件。注册完,后面如果有读写操作,selector#select 就会有数据,就可以进行读写操作
NioEndpoint.Poller#processKey
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//读
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
//写
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
}
}
复制代码
AbstractEndpoint#processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
//同样是个对象缓存器
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
//拿到最开始 init 时候创建的工作线程
Executor executor = getExecutor();
if (dispatch && executor != null) {
//通过工作线程执行任务
executor.execute(sc);
} else {
sc.run();
}
}
return true;
}
复制代码
到这里 poller 的作用就完事了。1、从accepter接过channle 2、往selector注册channle读事件 3、调用工作线程对channel进行读写操作