import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class Test { public static void main(String[] args){ // EventClient.test(); // Test1.test(); BooleanTest.test(); } } /* 5.1.2 异步非阻塞消息处理 同步阻塞消息处理: 异步非阻塞消息处理: 可以提高系统的吞吐量,而且业务处理线程的数量也能够控制在一个固定的范围 业务逻辑: 客户端提交Event后会得到一个相应的工单并立即返回,Event则会被放置在Event队列中。 服务端有若干个工作线程,不断地从Event队列中获取任务并进行异步处理,最后将处理结果 保存至另外一个结果集中,如果客户想要获得处理结果,则可凭借工单再次查询。 5.2 单线程间通信 处理线程如何知道事件队列中有可以处理的事件。 方案一:采用轮询 方案二:使用通知机制:如果队列中有Event,则同时工作线程开始工作,如果队列中没有Event, 则工作线程休息并等待通知。 所谓的电线程间通信,就是说生产者和消费者都各自为一个线程!!! */ class EventQueue{ private final int max; static class Event{ } private final LinkedList<Event> events = new LinkedList<>(); private final static int DEFAULT_MAX_EVENT = 10; public EventQueue(){ this(DEFAULT_MAX_EVENT); } public EventQueue(int max){ this.max=max; } /* 生产者调用这个方法,当队列满了的时候,就会调用events.wait(),生产者就会释放掉 monitor同时被添加到该monitor的wait set中。否则我就添加一个Event,并通知 monitor的wait set中所有wait的线程,让它们进行消费。 */ public void offer(Event event) { synchronized (events) { if (events.size() >= max) { try { console(" the queue is full."); events.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } console(" the new event is submitted."); events.addLast(event); events.notify(); } } /* 消费者调用这个方法,当队列空了的时候,就会调用events.wait(),生产者就会释放掉 monitor同时被添加到该monitor的wait set中。否则我就消费一个Event,并通知 monitor的wait set中所有wait的线程,让它们进行生产。 */ public Event take(){ synchronized (events) { if (events.isEmpty()) { try{ console(" the queue is empty."); events.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event event = events.removeFirst(); this.events.notify(); console(" the event "+event+" is handled."); return event; } } private void console(String msg) { System.out.printf("%s:%s\n",Thread.currentThread().getName(),msg); } } class EventClient{ public static void test() { final EventQueue eventQueue = new EventQueue(); //生产者 new Thread(()->{ while (true) { eventQueue.offer(new EventQueue.Event()); } },"Producer").start(); //消费者 new Thread(()->{ while (true) { eventQueue.take(); try{ TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } },"Consumer").start(); } } /* public final void wait() throws InterruptedException public final void wait(long timeout) throws InterruptedException public final void wait(long timeout, int nanos) throws InterruptedException 1.Object的wait(long timeout)方法会导致当前线程进入阻塞,直到有其他线程调用了Object的 notify或者notifyAll方法才能将其唤醒,或者阻塞时间到达了timeout时间而自动唤醒。 2.wait方法必须拥有该对象的monitor,也就是wait方法必须在同步方法中使用。 3.当前线程执行了该对想的wait方法后,就会放弃该monitor的所有权,并进入该对象关联的wait set中。 public final native void notify(); 1.唤醒单个正在执行该对象wait方法的线程。 2.被唤醒的线程需要重新获取该对想所关联monitor的lock才能继续执行。 ——注意!!!注意!!!注意!!!注意!!!注意!!! 这个地方说的是继续执行,我猜是从event.wait()方法后继续执行!!! 我需要一个实验!!! ——和我的猜想一模一样,这就像是Unity中的协程。 */ class Test1{ public static Object MUTEX = new Object(); public static void run1(){ synchronized (MUTEX){ try { System.out.println(1); System.out.println(2); System.out.println(3); System.out.println(4); System.out.println(5); System.out.println(6); System.out.println(7); System.out.println(8); MUTEX.wait(); System.out.println(9); System.out.println(10); System.out.println(11); System.out.println(12); System.out.println(13); System.out.println(14); System.out.println(15); System.out.println(16); }catch (Exception e){ } } } public static void run2(){ synchronized (MUTEX) { MUTEX.notify(); } } public static void test(){ new Thread(()->{ run1(); }).start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ run2(); }).start(); } } /* 5.2.3 关于wait和notify的注意事项 1.wait是可中断方法 2.线程执行了某个对象的wait方法后,会加入到与之对应的wait set中(每个对象的monitor 都有一个与之关联的wait set) 3.当线程进入wait set之后,notify方法可以将其唤醒。 4.必须在同步方法中使用wait和notify,否则会抛出IllefalMonitorStateException 5.同步代码的monitor不许与wait notify方法的对象一致。 5.2.4 wait和sleep 1.wait和sleep方法都可以使线程进入阻塞状态 2.wait和sleep方法均是可中断方法。 3.wait是Object的方法,而sleep是Thread特有的方法 4.wait方法的执行必须在同步方法中进行,而sleep不会 5.在同步方法中执行sleep时,不会释放monitor锁,而wait则会释放monitor锁 6.sleep方法短暂休眠后会主动退出阻塞状态,而wait如果没有指定时间则不会退出 */ /* 5.3 多线程间通信 数据不一致情况,最简单的思考方式:两个线程同时执行offer时,在wait方法上阻塞了,由于 wait方法阻塞被唤醒时,是从该位置继续运行。所以当一个线程被消费者唤醒时,其继续进行生 产,其上的notify方法没有唤醒消费者,却用来唤醒了另一个阻塞的线程,而这个线程从wait 处继续运行,导致其又生产了一个事件。 改进方法极其简单,就是把对数列长度的判断放在一个循环中,同时将notify方法改为notifyall 这样即使另一个生产这被唤醒了,它从wait处继续运行,紧接着又进行长度判断,如果满足条件 则跳出循环进行生产,否则则继续调用wait方法。 */ class EventQueue2{ private final int max; static class Event{ } private final LinkedList<Event> events = new LinkedList<>(); private final static int DEFAULT_MAX_EVENT = 10; public EventQueue2(){ this(DEFAULT_MAX_EVENT); } public EventQueue2(int max){ this.max=max; } public void offer(Event event) { synchronized (events) { while(events.size()>=max){ try { console(" the queue is full."); events.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } console(" the new event is submitted."); events.addLast(event); events.notify(); } } public Event take(){ synchronized (events) { while (events.isEmpty()) { try{ console(" the queue is empty."); events.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event event = events.removeFirst(); this.events.notify(); console(" the event "+event+" is handled."); return event; } } private void console(String msg) { System.out.printf("%s:%s\n",Thread.currentThread().getName(),msg); } } /* 5.3.2 线程休息室wait set 线程调用了某个对象的wait方法之后,都会加入该对象monitor关联的wait set中,并且释放monitor 的所有权。 */ /* 5.4自定义显式锁BooleanLock */ /* Lock设计说明: 1.lock()方法永远阻塞,除非获得了锁,但是可被中断 2.lock(long mills)方法除了可以被中断以外,还增加了对应的超时功能 3.unlock()可以进行锁的释放 4.getBlockedThreads()用于获取当前有哪些线程被阻塞 */ interface Lock{ void lock() throws InterruptedException; void lock(long mills) throws InterruptedException, TimeoutException; void unlock(); List<Thread> getBlockedThreads(); } /* BooleanLock类: */ class BooleanLock implements Lock{ private Thread currentThread; private boolean locked = false; private final List<Thread> blockedList = new ArrayList<>(); @Override public void lock() throws InterruptedException { synchronized (this){ while(locked){ if(!blockedList.contains(Thread.currentThread())){ blockedList.add(Thread.currentThread()); } this.wait(); } blockedList.remove(Thread.currentThread()); this.locked=true; this.currentThread=Thread.currentThread(); //这个地方记录一个当前的线程,有什么用呢? } } @Override public void lock(long mills) throws InterruptedException, TimeoutException { synchronized (this){ if(mills<=0){ //更好的方案是抛出一个错误 this.lock(); }else{ /* 其实这个地方有两个写法,但是书中这种明显好很多。还有一种写法就是,给个开始 时间,每次判断时,进行一次计算。 */ long remainingMills = mills; long endMills = System.currentTimeMillis()+remainingMills; while (locked){ if (remainingMills <= 0) { throw new TimeoutException("can not get the lock during " + mills + " ms."); } if(!blockedList.contains(Thread.currentThread())){ blockedList.add(Thread.currentThread()); } this.wait(remainingMills); remainingMills=endMills-System.currentTimeMillis(); } } } } @Override public void unlock() { synchronized (this){ /* 记录currentThread的用意: lock与unlock是成对出现的。当调用了lock时,如果不记录一下是 谁调用的,就有可能被其他线程恶意调用,导致这个当前加锁的线程 的设置的locked flag被莫名其妙的复原了,这也会导致其它线程 进入这块保护的区域,导致这个锁失去了意义。 */ if(currentThread==Thread.currentThread()){ this.locked=false; Optional.of(currentThread.getName()+" release the lock."); this.notifyAll(); } } } @Override public List<Thread> getBlockedThreads() { return blockedList; } } /* 使用BooleanLock */ class BooleanTest{ private final Lock lock = new BooleanLock(); public void syncMethod(){ try{ lock.lock(); int randomInt = ThreadLocalRandom.current().nextInt(10); System.out.println(Thread.currentThread()+" get the lock."); TimeUnit.SECONDS.sleep(randomInt); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public void syncMethodTimeoutable(){ try{ lock.lock(1000); System.out.println(Thread.currentThread()+" get the lock."); int randomInt = ThreadLocalRandom.current().nextInt(10); TimeUnit.SECONDS.sleep(randomInt); } catch (InterruptedException e) { e.printStackTrace(); } catch (TimeoutException e) { System.out.println("Time Out!!!"); e.printStackTrace(); } finally { lock.unlock(); } } public static void test(){ /* (1)多个线程通过lock()方法争抢锁 BooleanTest t = new BooleanTest(); IntStream.range(0,10) .mapToObj(i->new Thread(t::syncMethod)) .forEach(Thread::start); */ /* (2)可中断被阻塞的线程 BooleanTest t = new BooleanTest(); try { Thread t2 = new Thread(t::syncMethod,"T2"); t2.start(); TimeUnit.MILLISECONDS.sleep(10); t2.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } */ /* (3)阻塞的线程可超时 */ BooleanTest blt = new BooleanTest(); new Thread(blt::syncMethod,"T1").start(); try { TimeUnit.MILLISECONDS.sleep(2); Thread t2 = new Thread(blt::syncMethodTimeoutable,"T2"); t2.start(); TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }
——《Java高并发编程详解》笔记