版权声明:本文为博主原创,未经博主允许不得转载。 https://blog.csdn.net/weixin_36904568/article/details/90612747
一:并行中的设计模式
1. 单例模式
- 对于频繁使用的对象,可以节省初始化操作的开销
- 减轻GC压力,缩短GC停顿时间
package SingletonPattern;
/**
* 通过静态内部类创建对象
*/
public class StaticInnerClass {
private StaticInnerClass(){}
/**
* 被调用时才会加载,由JVM保护线程安全
*/
private static class holderClass{
private static StaticInnerClass singleton = new StaticInnerClass();
}
public static StaticInnerClass getInstance(){
return holderClass.singleton;
}
}
2. 不变模式
一个对象创建后,没有一个线程可以改变其内部状态和数据
- 没有修改的方法
- 子类无法重载
- 属性不可修改
- 构造函数可以创建完整对象
public final class Invariant {
private final String data;
public Invariant(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
3. 生产者和消费者模式
若干个生产者负责提交用户请求,若干个消费者负责处理请求,他们通过共享内存缓冲区进行通信
- 缓解了生产者和消费者的性能差
- 解耦
(1)数据
package ProducerAndConsumer;
public class Data {
private final int data;
public Data(int data) {
this.data = data;
}
public int getData() {
return data;
}
@Override
public String toString() {
return "Data{" +
"data=" + data +
'}';
}
}
(2)生产者
package ProducerAndConsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable{
//共享缓冲区
private BlockingQueue<Data> queue;
//互斥访问的标记
private boolean isFull = false;
//数据
private AtomicInteger count;
//名字
private String name;
public Producer(String name,BlockingQueue<Data> queue) {
this.queue = queue;
this.name = name;
count = new AtomicInteger();
}
@Override
public void run() {
Data data = null;
System.out.println("Thread start:"+name);
try {
while (!isFull){
//生产数据
data = new Data(count.incrementAndGet());
//添加数据到共享缓冲区
if (queue.offer(data,2, TimeUnit.SECONDS))
System.out.println("success to add data:"+data);
else
System.out.println("fail to add data:"+data);
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
//线程被中断
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void full(){
this.isFull = true;
}
}
(3)消费者
package ProducerAndConsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
//共享缓冲区
private BlockingQueue<Data> queue;
//名字
private String name;
//互斥访问的标记
private boolean isEmpty = false;
public Consumer(BlockingQueue<Data> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
System.out.println("Thread start:"+name);
try {
while (!isEmpty){
Data data = queue.take();
if (data != null)
System.out.println("success to get data:"+data);
else
System.out.println("fail to get data");
Thread.sleep(new Random().nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
(4)实现
package ProducerAndConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Data> queue = new LinkedBlockingDeque<>(10);
Producer producer1 = new Producer("producer1",queue);
Producer producer2 = new Producer("producer2",queue);
Consumer consumer1 = new Consumer(queue,"consumer1");
Consumer consumer2 = new Consumer(queue,"consumer2");
ExecutorService service = Executors.newCachedThreadPool();
service.submit(producer1);
service.submit(producer2);
service.submit(consumer1);
service.submit(consumer2);
Thread.sleep(10000);
producer1.full();
producer2.full();
Thread.sleep(3000);
service.shutdown();
}
}
4. NIO
(1)基于socket的服务端多线程模式
服务端需要先读取客户端的输入,才能进行处理。如果客户端处理缓慢,则服务端也需要进行漫长的等待,能够处理的并发数量会很少。
服务器
package Socket;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server {
//线程池,为每个客户端开启一个线程
private static ExecutorService service = Executors.newCachedThreadPool();
//服务器处理线程
static class MyThread implements Runnable{
Socket socket;
public MyThread(Socket socket) {
this.socket = socket;
}
//获取客户端的输入,并在客户端输出
@Override
public void run() {
BufferedReader input = null;
PrintWriter output = null;
try {
input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
output = new PrintWriter(socket.getOutputStream(),true);
String s = null;
while ((s=input.readLine()) != null){
output.println(s);
}
} catch (IOException e) {
e.printStackTrace();
}
finally {
try {
if (input != null)
input.close();
if (output != null)
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException {
//开启服务器,准备连接客户端
ServerSocket server = null;
Socket client = null;
server = new ServerSocket(8080);
while (true){
client = server.accept();
System.out.println("a client connect:"+client.getRemoteSocketAddress());
service.submit(new MyThread(client));
}
}
}
客户端
package Socket;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
public class Client {
//连接服务器,发送消息和接收消息
public static void main(String[] args) {
Socket client = null;
BufferedReader reader = null;
PrintWriter writer = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost",8080));
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("get message from server:"+reader.readLine());
writer = new PrintWriter(client.getOutputStream(),true);
writer.println("Hello");
} catch (IOException e) {
e.printStackTrace();
}
finally {
try {
if (reader != null)
reader.close();
if (writer != null)
writer.close();
if (client != null)
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
(2)NIO的网络编程
将网络IO的等待时间从线程中分离出来
- Buffer:一个内存区域,包装数据
- Channel:类似于流
- Selector:管理多个流
服务器
package NIO;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Server {
private ExecutorService service = Executors.newCachedThreadPool();
//选择器
private Selector selector;
private void server()throws Exception{
//获取选择器和服务端的流,开启服务器
selector = SelectorProvider.provider().openSelector();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(8080));
//注册服务器的流
SelectionKey serverKey = channel.register(selector,SelectionKey.OP_ACCEPT);
while (true){
//有数据可读
selector.select();
//开始处理所有流
Set keys = selector.selectedKeys();
Iterator iterator = keys.iterator();
while (iterator.hasNext()){
//移除要处理的键
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
//连接客户端
if (key.isAcceptable()){
accept(key);
}
//读取客户端
else if(key.isReadable()){
read(key);
}
//写入客户端
else if(key.isWritable()){
write(key);
}
}
}
}
private void accept(SelectionKey key){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client;
try {
//获取连接的客户端
client = server.accept();
client.configureBlocking(false);
//注册客户端
SelectionKey clientKey = client.register(selector,SelectionKey.OP_READ);
//每个客户端绑定一个记录消息的队列
Queue queue = new Queue();
clientKey.attach(queue);
System.out.println("connect from "+client.socket().getInetAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key){
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//读入客户端的消息到缓冲区
int len = client.read(buffer);
if (len < 0)
disconnect(key);
} catch (IOException e) {
e.printStackTrace();
disconnect(key);
return;
}
//开启线程,整理缓冲区
buffer.flip();
service.submit(new MyThread(key,buffer));
}
private void write(SelectionKey key){
//获取流绑定的队列里面的缓冲区内容
SocketChannel client = (SocketChannel) key.channel();
Queue queue = (Queue) key.attachment();
LinkedList<ByteBuffer> list = queue.getList();
ByteBuffer buffer = list.getLast();
try {
//写入内容
int len = client.write(buffer);
if (len == -1){
disconnect(key);
return;
}
//完成写入
if (buffer.remaining() == 0)
list.removeLast();
} catch (IOException e) {
e.printStackTrace();
disconnect(key);
return;
}
//移除注册写的流
if (list.size() == 0)
key.interestOps(SelectionKey.OP_READ);
}
private void disconnect(SelectionKey key){
selector.selectedKeys().remove(key);
}
private class MyThread implements Runnable{
SelectionKey key;
ByteBuffer byteBuffer;
public MyThread(SelectionKey key, ByteBuffer byteBuffer) {
this.key = key;
this.byteBuffer = byteBuffer;
}
//把缓冲区的内容加入到队列中,重新注册事件
@Override
public void run() {
Queue queue = (Queue) key.attachment();
queue.enqueue(byteBuffer);
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.wakeup();
}
}
private class Queue{
private LinkedList<ByteBuffer> list;
public Queue() {
list = new LinkedList<>();
}
public LinkedList<ByteBuffer> getList() {
return list;
}
public void enqueue(ByteBuffer buffer){
list.addFirst(buffer);
}
}
}
客户端
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
public class Client {
//选择器
private Selector selector;
public void init(String ip, int port) throws IOException {
//获取选择器,开启客户端
selector = SelectorProvider.provider().openSelector();
SocketChannel client = SocketChannel.open();
client.configureBlocking(false);
client.connect(new InetSocketAddress(ip, port));
//注册客户端的流
client.register(selector, SelectionKey.OP_CONNECT);
}
public void work() throws IOException {
while (true) {
if (!selector.isOpen())
break;
//有数据可读
selector.select();
//开始处理所有流
Set keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//连接服务器
if (key.isConnectable()) {
connect(key);
} else if (key.isReadable()) {
read(key);
}
}
}
}
private void connect(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
//判断是否还未建立连接
if (client.isConnectionPending())
client.finishConnect();
client.configureBlocking(false);
String text = "Hello";
client.write(ByteBuffer.wrap(text.getBytes()));
client.register(selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读入服务器的消息到缓冲区
client.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("get data from server:"+msg);
client.close();
key.selector().close();
}
}