代码1.0
/**
* 多线程服务器(相当于有一个大厅经理管理手下干活的进程)
* @author FuQingsong
* @create 2023-04-07-19:47
*/
public class MuliThreadServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
server work = new server("work_0");
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true){
selector.select();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
System.out.println("交给手下的人进行干活!!");
work.register(sc);
}
}
//只进行读的线程
static class server implements Runnable{
private Thread thread;
private String name;//读服务线程名
private Selector selector;
private boolean isStart = true;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue();//用于线程通信
public server(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws Exception{
if (isStart){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
isStart = false;
}
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();//唤醒下面的select()
}
@Override
public void run() {
while (true){
try {
selector.select();
Runnable task = queue.poll();
if (task != null){
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(32);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer).toString());
buffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
代码2.0 两个工作线程
/**
* 多线程服务器(相当于有一个大厅经理管理手下干活的进程)
* @author FuQingsong
* @create 2023-04-07-19:47
*/
public class MuliThreadServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
//server work = new server("work_0");
server[] work = new server[2];
for (int i = 0; i < work.length ; i++) {
work[i] = new server("work_"+i);
}
int count = 1;
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true){
selector.select();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
System.out.println("交给手下的人进行干活!!");
work[(count++)%2].register(sc);
}
}
//只进行读的线程
static class server implements Runnable{
private Thread thread;
private String name;//读服务线程名
private Selector selector;
private boolean isStart = true;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue();//用于线程通信
public server(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws Exception{
if (isStart){
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
isStart = false;
}
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();//唤醒下面的select()
}
@Override
public void run() {
while (true){
try {
selector.select();
Runnable task = queue.poll();
if (task != null){
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(32);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
System.out.println(Thread.currentThread().getName());
System.out.println(Charset.defaultCharset().decode(buffer).toString());
buffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}