原创转载请注明出处:http://agilestyle.iteye.com/admin/blogs/2343551
Future
Callable
接口Callable与线程功能密不可分,但和Runnable的主要区别为:
Callable接口的call()方法可以有返回值,而Runnable接口的run()方法没有返回值
Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。
执行完Callable接口中的任务后,返回值是通过Future接口进行获得的。
ExecutorService
get()结合submit(Callable<T> task)
MyCallable.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { private String data; public MyCallable(String data) { this.data = data; } @Override public String call() throws Exception { Thread.sleep(5000); return data; } }
FutureCallableTest.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class FutureCallableTest { public static void main(String[] args) { try { MyCallable callable = new MyCallable("my data"); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 5L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); Future<String> future = executor.submit(callable); System.out.println("begin " + System.currentTimeMillis()); System.out.println(future.get()); System.out.println("end " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Run
Note:
get()方法是阻塞的
get()结合submit(Runnable task)
FutureCallableTest2.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class FutureCallableTest2 { public static void main(String[] args) { try { ExecutorService executorService = Executors.newCachedThreadPool(); Runnable task = new Runnable() { @Override public void run() { System.out.println("invoked..."); } }; Future future = executorService.submit(task); System.out.println(future.get() + " " + future.isDone()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Run
get()结合submit(Runnable task, T result)
User.java
package org.fool.java.concurrent.futurecallable; public class User { private String username; private String password; public User() { } public User(String username, String password) { this.username = username; this.password = password; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } }
MyThread.java
package org.fool.java.concurrent.futurecallable; public class MyThread implements Runnable { private User user; public MyThread(User user) { this.user = user; } @Override public void run() { user.setUsername("hello"); user.setPassword("world"); } }
FutureCallableTest3.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.*; public class FutureCallableTest3 { public static void main(String[] args) { try { User user = new User(); MyThread thread = new MyThread(user); ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); Future<User> future = executor.submit(thread, user); System.out.println("begin " + System.currentTimeMillis()); User result = future.get(); System.out.println(result.getUsername() + ":" + result.getPassword()); System.out.println("end " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Run
cancel(boolean mayInterruptIfRunning)和isCancelled()的使用
cancel(boolean mayInterruptIfRunning)作用是尝试cancel掉正在运行的线程,这个尝试可能会失败,由于任务已经完成,或已经被cancelled,或一些其他的原因不能被cancel。
isCancelled()作用是发送取消任务的命令是否成功完成
FutureCallableTest4.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.*; public class FutureCallableTest4 { public static void main(String[] args) throws ExecutionException, InterruptedException { MyCallable callable = new MyCallable("my data"); ExecutorService executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); Future<String> future = executor.submit(callable); Thread.sleep(1000); System.out.println(future.cancel(true) + " " + future.isCancelled()); } public static class MyCallable implements Callable<String> { private String data; public MyCallable(String data) { this.data = data; } @Override public String call() throws Exception { while(true) { if(Thread.currentThread().isInterrupted()) { System.out.println("Thread interrupted..."); break; } System.out.println("Invoking..."); } return this.data; } } }
Run
get(long timeout, TimeUnit unit)的使用
get(long timeout, TimeUnit unit)的作用是在指定最大时间内等待获得返回值
FutureCallableTest5.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.*; public class FutureCallableTest5 { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { MyCallable callable = new MyCallable(); ExecutorService executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); System.out.println("begin " + System.currentTimeMillis()); Future<String> future = executor.submit(callable); System.out.println("return value: " + future.get(5, TimeUnit.SECONDS)); System.out.println("end " + System.currentTimeMillis()); } public static class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(10000); System.out.println("Invoked after sleep 10 seconds"); return "data"; } } }
Run
自定义拒绝策略RejectedExecutionHandler接口的使用
RejectedExecutionHandler的主要作用是当线程池关闭后依然有任务要执行时,可以实现一些额外的处理。
MyRejectedExecutionHandler.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " rejected..."); } }
FutureCallableTest6.java
package org.fool.java.concurrent.futurecallable; import java.util.concurrent.*; public class FutureCallableTest6 { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler()); executor.submit(new MyThread("Thread A")); executor.submit(new MyThread("Thread B")); executor.submit(new MyThread("Thread C")); executor.shutdown(); executor.submit(new MyThread("Thread D")); } public static class MyThread implements Runnable { private String threadName; public MyThread(String threadName) { this.threadName = threadName; } @Override public void run() { System.out.println(threadName + " invoked..."); } } }
Run