什么是Thread-Per-Message模式
Thread-Per-Message的意思是为每一个消息的处理开辟一个线程使得消息能够以并发的方式进行处理,从而提高系统整体的吞吐能力。 这就好比电话接线员一样,收到的每一个电话投诉或者业务处理请求,都会提交对应的工单,然后交由对应的工作人员来处理。
每个任务一个线程:
package MutilThreadModel.ThreadPerMessageModel; /** * Created by JYM on 2019/1/16 * 客户提交的任何业务受理请求都会被封装成Request对象 * */ public class Request { private final String business; public Request(String business) { this.business = business; } @Override public String toString() { return business; } }
客户提交的任何业务受理请求都会被封装成Request对象。
package MutilThreadModel.ThreadPerMessageModel; import java.util.concurrent.TimeUnit; import static java.util.concurrent.ThreadLocalRandom.current; /** * Created by JYM on 2019/1/16 * TaskHandler代表了每一个工作人员接收到任务后的处理逻辑 * */ /* * TaskHandler用于处理每一个提交的Request请求,由于TaskHandler将被Thread执行, * 因此需要实现Runnable接口 * */ public class TaskHandler implements Runnable { //需要处理的Request请求 private final Request request; public TaskHandler(Request request) { this.request = request; } @Override public void run() { System.out.println("Begin handle "+request); slowly(); System.out.println("End handle "+request); } //模拟请求处理比较耗时,使线程进入短暂的休眠阶段 private void slowly() { try{ TimeUnit.SECONDS.sleep(current().nextInt(10)); }catch (InterruptedException e) { e.printStackTrace(); } } }
TaskHandler代表了每一个工作人员接收到任务后的处理逻辑。
package MutilThreadModel.ThreadPerMessageModel; /** * Created by JYM on 2019/1/16 * */ public class Operator { public void call(String business) { //为每一个请求创建一个线程去处理 TaskHandler taskHandler = new TaskHandler(new Request(business)); new Thread(taskHandler).start(); } } /* * Operator代表了接线员,当有电话打进来时,话务员会将客户的请求封装成一个工单Request,然后开辟一个线程(工作人员)去处理。 * 截止目前,我们完成了关于Thread-Per-Message的设计,但是这种设计方式存在着很严重的问题,经过第2章的学习,我们知道每一个JVM * 中可创建的线程数量是有限的,针对每一个任务都创建一个新的线程,假如每一个线程执行的时间比较长,那么在某个时刻JVM会由于无法在创建 * 新的线程而导致栈内存的溢出;在假如每一个任务的执行时间都比较短,频繁地创建销毁线程对系统性能的开销也是一个不小的影响。 * 这种处理方式虽然有很多问题,但不代表其就一无是处了,其实他也有自己的使用场景,比如在基于Event的编程模型中,当系统初始化时间发生时 * ,需要进行若干资源的后台加载,由于系统初始化时的任务数量并不多,可以考虑使用该模式响应初始化Event,或者系统在关闭时,进行资源回收也可以 * 考虑将销毁事件触发的动作交给该模式。 * 我们可以将call方法中的创建新线程的方式交给线程池去处理,这样可以避免线程频繁创建和销毁带来的系统开销,还能将线程数量控制在一个可控的范围之内。 * */
package MutilThreadModel.ThreadPerMessageModel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created By JYM on 2019/1/16 * 重构Operator*/ public class Operator_re { //使用线程池替代为每一个请求创建线程 private final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); public void call(String business) { TaskHandler taskHandler = new TaskHandler(new Request(business)); fixedThreadPool.execute(taskHandler); } }
多用户的网络聊天
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by JYM on 2019/1/16 * 多用户的网络聊天: * Thread-Per-Message模式在网络通信中的使用也是非常广泛的,比如在本节中介绍的网络聊天程序, * 在服务端每一个连接到服务端的连接都将创建一个独立的线程进行处理,当客户端的连接数超过了服务端 * 的最大受理能力时,客户端将被存放至排队队列中。 * */ /* * 下面编写服务端程序ChatServer用于接收来自客户端的链接,并且与之进行TCP通信交互, * 当服务端接收到了每一次的客户端连接后便会给线程池提交一个任务用于与客户端进行交互,进而提高并发响应能力。 * */ public class ChatServer { //服务端端口 private final int port; //定义线程池, private ExecutorService fixedThreadPool; //服务端Socket private ServerSocket serverSocket; //通过构造函数传入端口 public ChatServer(int port) { this.port = port; } //默认使用13312端口 public ChatServer() { this(13312); } public void startServer() throws IOException { //创建线程池 this.fixedThreadPool = Executors.newFixedThreadPool(10); this.serverSocket = new ServerSocket(port); this.serverSocket.setReuseAddress(true); System.out.println("Chat server is started and listen at port: "+port); } private void listen() throws IOException { for (; ;) { //accept方法是阻塞方法,当有新的链接进入时才会返回,并且返回的是客户端的连接 Socket client = serverSocket.accept(); //将客户端连接作为一个Request封装成对应的Handler然后提交给线程池 this.fixedThreadPool.execute(new ClientHandler(client)); } } }
在上面的程序中,当接收到了新的客户端连接时,会为每一个客户端连接创建一个线程ClientHandler与客户端进行交互,当客户端的连接个数超过线程池的最大数量时,客户端虽然可以成功接入服务端,但是会进入阻塞队列。
package MutilThreadModel.ThreadPerMessageModel; import java.io.*; import java.net.Socket; /** * Created by JYM on 2019/1/16 * 待服务器端接收到客户端的连接之后,便会创建一个新的ClientHandler任务提交给线程池,ClientHandler任务是 * Runnable接口的实现,主要负责和客户端进行你来我往的简单通信。 * */ //ClientHandler同样也是一个Runnable接口的实现 public class ClientHandler implements Runnable { //客户端的socket连接 private final Socket socket; //客户端的identity private final String clientIdentify; //通过构造函数传入客户端连接 public ClientHandler(final Socket socket) { this.socket = socket; this.clientIdentify = socket.getInetAddress().getHostAddress()+":"+socket.getPort(); } @Override public void run() { try{ this.chat(); }catch (IOException e) { e.printStackTrace(); } } private void chat() throws IOException { BufferedReader bufferedReader = wrap2Reader(this.socket.getInputStream()); PrintStream printStream = wrap2Print(this.socket.getOutputStream()); String received; while ((received = bufferedReader.readLine()) != null) { //将客户端发送的消息输出到控制台 System.out.printf("client:%s-message:%s\n",clientIdentify,received); if (received.equals("quit")) { //如果客户端发送了quit指令,则断开与客户端的连接 write2Client(printStream,"client will close"); socket.close(); break; } //向客户端发送消息 write2Client(printStream,"server"+received); } } //将输入字节流封装成BufferedReader缓冲字符流 private BufferedReader wrap2Reader(InputStream inputStream) { return new BufferedReader(new InputStreamReader(inputStream)); } //将输出字节流封装成PrintStream private PrintStream wrap2Print(OutputStream outputStream) { return new PrintStream(outputStream); } //该方法主要用于向客户端发送消息 private void write2Client(PrintStream print,String message) { print.println(message); print.flush(); } } 聊天程序测试:
package MutilThreadModel.ThreadPerMessageModel; import java.io.IOException; public class ChatTest { public static void main(String[] args) throws IOException { new ChatServer().startServer(); } }