Observable:
public interface Observable { enum Cycle{ STARTED, RUNNING, DONE, ERROR } Cycle getCycle(); void start(); void interrupt(); }
TaskLifeCycle:
public interface TaskLifeCycle<T> { void onStart(Thread thread); void onRunning(Thread thread); void onFinish(Thread thread,T result); void onError(Thread thread,Exception e); } class EmptyLifeCycle<T> implements TaskLifeCycle<T>{ @Override public void onStart(Thread thread) { } @Override public void onRunning(Thread thread) { } @Override public void onFinish(Thread thread, T result) { } @Override public void onError(Thread thread, Exception e) { } }
Task:
public interface Task <T>{ T call(); }
ObservableThread:
/* 突然发现Observable中的start方法有点可怕的啊,这种写法,我感觉我是还每吃透 */ public class ObservableThread<T> extends Thread implements Observable{ private final TaskLifeCycle<T> lifeCycle; private final Task<T> task; private Cycle cycle; public ObservableThread(Task<T> task){ this(new TaskLifeCycle.EmptyLifeCycle<>(),task); } public ObservableThread(TaskLifeCycle<T> lifeCycle,Task<T> task){ super(); if(task==null) throw new IllegalArgumentException("The task is required."); this.lifeCycle=lifeCycle; this.task=task; } public final void run(){ this.update(Cycle.STARTED,null,null); try{ this.update(Cycle.RUNNING,null,null); T result = this.task.call(); this.update(Cycle.DONE,result,null); } catch (Exception e){ this.update(Cycle.ERROR,null,e); } } private void update(Cycle cycle, T result, Exception e){ this.cycle = cycle; if (lifeCycle == null) { return; } try { switch (cycle) { case STARTED: this.lifeCycle.onStart(currentThread()); break; case RUNNING: this.lifeCycle.onRunning(currentThread()); break; case DONE: this.lifeCycle.onFinish(currentThread(), result); break; case ERROR: this.lifeCycle.onError(currentThread(), e); break; } } catch (Exception ex) { throw ex; } } public Cycle getCycle(){ return this.cycle; } }
测试代码:
/* 突然发现Observable中的start方法有点可怕的啊,这种写法,我感觉我是还每吃透 */ public class ObservableThread<T> extends Thread implements Observable{ private final TaskLifeCycle<T> lifeCycle; private final Task<T> task; private Cycle cycle; public ObservableThread(Task<T> task){ this(new TaskLifeCycle.EmptyLifeCycle<>(),task); } public ObservableThread(TaskLifeCycle<T> lifeCycle,Task<T> task){ super(); if(task==null) throw new IllegalArgumentException("The task is required."); this.lifeCycle=lifeCycle; this.task=task; } public final void run(){ this.update(Cycle.STARTED,null,null); try{ this.update(Cycle.RUNNING,null,null); T result = this.task.call(); this.update(Cycle.DONE,result,null); } catch (Exception e){ this.update(Cycle.ERROR,null,e); } } private void update(Cycle cycle, T result, Exception e){ this.cycle = cycle; if (lifeCycle == null) { return; } try { switch (cycle) { case STARTED: this.lifeCycle.onStart(currentThread()); break; case RUNNING: this.lifeCycle.onRunning(currentThread()); break; case DONE: this.lifeCycle.onFinish(currentThread(), result); break; case ERROR: this.lifeCycle.onError(currentThread(), e); break; } } catch (Exception ex) { throw ex; } } public Cycle getCycle(){ return this.cycle; } }
《Java高并发编程详解》笔记