一 同步阻塞和异步非阻塞
如果有这样一个功能,客户端提交event至服务器,服务器接收到客户请求之后开辟线程处理客户请求,进过处理后将结果返回给客户端,如下图:
这有几个问题:、
同步Event提交,客户端等待时间过长,会陷入阻塞。导致第二次提交Event耗时过长;
客户端提交的Event数量不多,导致系统同事受理的业务数量有限,也就是系统整体的吞吐量不高。
这种一个线程处理一个Event的方式,会导致出现频繁的创建开启和销毁,从而增加系统的额外开销。
在业务到达峰值的时候,大量业务线程的阻塞,会导致频繁的CPU上下文切换,从而降低系统性能;
换成下图异步非阻塞消息处理机制:
客户端提交Event后会得到一个相应的工单并且立即返回,Event则会被放置在Event队列中,服务端有若干个工作线程。不断从Event中获取任务并且进行异步处理,最后将结果保存在结果集中,如果客户端想要获取处理结果,则可凭借工单号再次查询。
你会发现非阻塞优势非常明显,首先客户端不用等结果处理,提高了系统吞吐量和并发量,其次,若服务器的线程数量在一个可控范围之内是不会导致太多的CPU上下文切换从而带来的额外开销的,再次服务端线程可以重复利用,减少了不断创建线程带来的资源浪费,但是异步也存在缺陷,比如客户端想要得到结果,还要再次调用结口查询。
单线程间的通信:
初识wait和notify
开始学习线程之间如何进行通信,首先实现一个Eventqueue,该queue如下三种状态:
1 队列满-- 最多可容纳多少个Event 好比系统给最多同时能够受理多少业务一样
2 队列空---当所有的のvnet都被处理并且没有新的Event被提交的时候,此时队列是空
3有 Event 的但是没有满
示例代码如下:
public class EventQueue {
private final int max;
static class Event {
}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEFAULT_MAX_EVENT = 10;
public EventQueue() {
this(DEFAULT_MAX_EVENT);
}
public EventQueue(int max) {
this.max = max;
}
public void offer(Event event) {
synchronized (eventQueue) {
if (eventQueue.size() > max) {
try {
console("the queue is full.");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console("the new event is submitted");
eventQueue.addLast(event);
eventQueue.notify();
}
}
public Event take() {
synchronized (eventQueue) {
if (eventQueue.isEmpty()) {
console("the queue is empty");
try {
eventQueue.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Event event=eventQueue.removeFirst();
this.eventQueue.notify();
console("the enent "+event+"====ishandled");
return event;
}
}
private void console(String message) {
System.out.printf("%s:%s\n", Thread.currentThread().getName(), message);
}
}
上面代码,在Eventqueue中定义了一个队列,offer方法会提交一个Event到队尾,如果队列满了,那么线程会阻塞,这是调用了wait方法的结果,同样take方法会从队头获取数据,如果=队列没有可用数据,那么线程会阻塞,也是用wait方法的结果,还可以看到一个notify方法,作用是唤醒那些曾经执行monitor的wait方法而进入阻塞的线程;
下面写两个线程调用测绘师腋下:
public class EventClient {
public static void main(String[] args) {
final EventQueue eventQueue=new EventQueue();
new Thread(()->{
for(; ;) {
eventQueue.offer(new EventQueue.Event());
}
},"producer").start();;
new Thread(()->{
for (; ;) {
eventQueue.take();
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer").start();;
}
}
其中producer线程模拟提交Event的客户端几乎没有任何的延迟,而consumer线程则用于模拟处理请求的工作线程,consumer,也就是单线程的通信,多个生产和消费会后面看,运行,结果如下:
看出producer 很快提交了10个Event数据,此时队列满了,他会执行wait方法进入阻塞,consumer 线程由于要处理数据,所以会花费大概10秒钟时间来处理其中一条数据,然后通知producer可以继续提交数据了如此循环往复
二 wait和notify 方法详解
wait和notify 方法并不是thread特有的方法,而是object中的方法,也就是说在jdk中每一个类都有着两个方法,下面是wait方法的三个重载方法,
public final void wait () throws InterruptedExeption
public final void wait (long timeout) throws InterruptedExeption
public final void wait ( long timeout ,int nanos) throws InterruptedExeption
(1)wait 方法的三个重载方法都调用wait(long timeout )这个方法,前面使用的wait()方法等价于wait(0) ,0代表永不超时
(2)object的wait()方法会导致当前线程进入阻塞,直到有其他线程调用了object的notify方法或者notifyall 才能唤醒,或者阻塞时间到了timeout时间自动唤醒,
(3)wait方法必须拥有该对象的monitor,也就是wait方法必须在同步方法中使用,
(4)当前线程执行了该对象的wait方法之后,将会放弃对该monitor的所有权并且进入与该对象关联的wait set中,也就是一旦线程执行了某个object的wait方法之后,他就会释放对该对象monitor的所有权,其他线程也会有机会继续争抢改monitor的所有权。
对照 上面所展示的Eventqueue中的代码,由于有多个线程操作Event-queue,因此他自然就是共享资源,为了防止多线程对共享资源操作引起数据不一致的问题,使用了synchronize 关键字进行同步,如果队列Event数量已经达到了上线,他会调用Eventqueue的wait方法使当前线程进入wait set中并且释放monitor 的锁 代码如下:
如果没有达到上限,则会插入队尾,并且企图唤醒已经加入wait set中的线程,,由于我们用的单线程插入和移除,因此offer(中执行的唤醒只能是执行take时被阻塞的线程,
看下notify 方法的作用:
public final native void notify();
(1)唤醒单个正在执行该对象wait方法的线程
(2)如果有某个线程由于执行该对象的wait方法而进入阻塞则会被唤醒,如果没有则会忽略,
(4)被唤醒的线程要重新获取该对象所关联的monitor的lock才能继续执行
三 关于wait 和notify 的注意事项
1 wait方法使可中断方法,这也就意味着,当前线程一旦调用了外套方法进入了阻塞状态,其他线程是可以使用interrupt方法将其打断的,上面说过,可中断方法被打断后会收到中断异常interruptException 同时interrupt 标识也会别擦除,
2 线程执行了某个对象的wait方法后,会加入与之对应的wait set中,每一个对象的monitor都有一个与之关联的wait set
3 当线程进入wait set中之后,notify方法可以将其唤醒,也就是从wait set中弹出,同时中断wait 中wait 中的线程也会将其唤醒
4 必须在同步方法中使用wait 和notify方法,因为执行wait 和notify的前提条件是必须持有同步方法的monitor的所有权,运行下面任何一个方法都会抛出非法的monitor 状态异常 illegalmonitorstateException
上述同步方法中monitor 引用的是this,而wait 和notify方法使用的却是MUTEX方法,虽然同步方法中执行了wait和notify方法但是wait 和notify方法的执行没有获取MUTEX的monitor 为前提
四; wait 和sleep
1 wait 和sleep都可以使线程进入阻塞状态
2 wait 和sleep 均是可中断方法,被中断后都会收到中断异常
3 wait是object的方法,sleep是thread的特有的方法
4 wait方法必须在同步方法中使用,而sleep则不需要
5 线程在同步方法中执行sleep方法时候不会释放monitor 的锁,而wait方法会释放monitor的锁
6 sleep 方法短暂休眠之后会主动退出阻塞,而wait方法(没有指定wait时间)则需要被其他线程中断后才能退出阻塞