版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012482178/article/details/85233902
使用方式:
PAsyncTask.getInstance().execute(new Runnable() {
@Override
public void run() {
// 处理耗时操作
}
});
private Future<?> readerThread;
readerThread = PAsyncTask.getInstance().submit(new Runnable() {
@Override
public void run() {
// 处理耗时操作
}
});
readerThread.cancel(true);
PAsyncTask.getInstance().start(activity,new AbsTask<List<GroupEntity> >() {
@Override
protected List<GroupEntity> doBackground() throws Throwable {
}
@Override
protected void onSuccess(Object result) {
// 回到主线程操作,start后面传参activity为考试内存泄漏,不传则不考虑。
List<GroupEntity> res = (List<GroupEntity>) result;
}
@Override
protected void onError(Throwable ex, boolean isCallbackError) {
// 错误回调
}
});
关键点在这:自定义线程池的几个关键参数大小。
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors() * 2;
private static final int CORE_POLL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POLL_SIZE = CPU_COUNT * 2 + 1;
private static final int WORK_QUEUE_SIZE = 160;// TODO AsyncTask的是128,后期选定一个折中值
private static final int KEEP_ALIVE = 1;
后面是各个类的具体实现。PALog为一个打印而已,大家可以随意替换打印工具类
package com.paic.lib.base.util.thread;
import java.util.concurrent.Executor;
/**
* 异步任务基类
*
* @author 优化取自xUitls的线程池工具类
*/
public abstract class AbsTask<ResultType> implements Callback.Cancelable {
private TaskProxy taskProxy = null;
private final Callback.Cancelable cancelHandler;
private volatile boolean isCancelled = false;
private volatile State state = State.IDLE;
private ResultType result;
public AbsTask() {
this(null);
}
public AbsTask(Callback.Cancelable cancelHandler) {
this.cancelHandler = cancelHandler;
}
protected abstract ResultType doBackground() throws Throwable;
protected abstract void onSuccess(Object result);
protected abstract void onError(Throwable ex, boolean isCallbackError);
protected void onWaiting() {
}
protected void onStart() {
}
protected void onUpdate(int flag, Object... args) {
}
protected void onCancelled(Callback.CancelledException cex) {
}
protected void onFinished() {
}
public Priority getPriority() {
return null;
}
public Executor getExecutor() {
return null;
}
protected final void update(int flag, Object... agrs) {
if (taskProxy != null) {
taskProxy.onUpdate(flag, agrs);
}
}
protected void cancelWorks() {
}
private boolean isCancelFast() {
return false;
}
@Override
public final synchronized void cancel() {
if (!this.isCancelled) {
this.isCancelled = true;
cancelWorks();
if (cancelHandler != null && !cancelHandler.isCancelled()) {
cancelHandler.cancel();
}
if (this.state == state.WAITING ||
(this.state == State.STARTED && isCancelFast())) {
if (taskProxy != null) {
taskProxy.onCancelled(new Callback.CancelledException("cancelled by user"));
taskProxy.onFinished();
} else if (this instanceof TaskProxy) {
this.onCancelled(new Callback.CancelledException("cancelled by user"));
this.onFinished();
}
}
}
}
@Override
public final boolean isCancelled() {
return isCancelled || state == State.CANCELLED ||
(cancelHandler != null && cancelHandler.isCancelled());
}
public final boolean isFinished() {
return this.state.value() > state.STARTED.value();
}
public final State getState() {
return state;
}
public final ResultType getResult() {
return result;
}
void setState(State state) {
this.state = state;
}
final void setTaskProxy(TaskProxy taskProxy) {
this.taskProxy = taskProxy;
}
final void setResult(ResultType result) {
this.result = result;
}
public enum State {
IDLE(0), WAITING(1), STARTED(2), SUCCESS(3), CANCELLED(4), ERROR(5);
private final int value;
State(int value) {
this.value = value;
}
public int value() {
return value;
}
}
}
package com.paic.lib.base.util.thread;
/**
* 通用回调接口
*
* @author 优化取自xUitls的线程池工具类
*/
public interface Callback {
interface GroupCallback<ItemType> extends Callback {
void onSuccess(ItemType item);
void onError(ItemType item, Throwable ex, boolean isOnCallback);
void onCancelled(ItemType item, CancelledException cex);
void onFinished(ItemType item);
void onAllFinished();
}
interface Cancelable {
void cancel();
boolean isCancelled();
}
class CancelledException extends RuntimeException {
public CancelledException(String detailMessage) {
super(detailMessage);
}
}
}
package com.paic.lib.base.util.thread;
import android.app.Activity;
import android.os.Looper;
import android.support.annotation.NonNull;
import com.paic.lib.base.log.PALog;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* PAAsyncTask
* TODO 主要用到的是execute()方法,也提供submit()方法,类似AsyncTask的start()方法等。
*
* @author 优化取自xUitls的线程池工具类
*/
public class PAsyncTask implements TaskController {
private PAsyncTask() {
}
// 可见性,不具有原子性
private static volatile TaskController instance;
public static TaskController getInstance() {
if (instance == null) {
synchronized (TaskController.class) {
if (instance == null) {
instance = new PAAsyncTask();
}
}
}
return instance;
}
@Override
public void autoPost(Runnable runnable) {
if (runnable == null) {
return;
}
if (Thread.currentThread() == Looper.getMainLooper().getThread()) {
runnable.run();
} else {
TaskProxy.sHandler.post(runnable);
}
}
@Override
public void post(Runnable runnable) {
if (runnable == null) {
return;
}
TaskProxy.sHandler.post(runnable);
}
@Override
public void postDelayed(Runnable runnable, long delayMillis) {
if (runnable == null) {
return;
}
TaskProxy.sHandler.postDelayed(runnable, delayMillis);
}
@Override
public void execute(Runnable runnable) {
TaskProxy.sDefaultExecutor.execute(runnable);
}
@Override
public void executeNewCached(Runnable runnable) {
Executors.newCachedThreadPool().execute(runnable);
}
@Override
public Future<?> submit(Runnable runnable) {
return TaskProxy.sDefaultExecutor.submit(runnable);
}
@Override
public void removeCallbacks(Runnable runnable) {
TaskProxy.sHandler.removeCallbacks(runnable);
}
@Override
public void removeCallbacks(Runnable... runnable) {
if (runnable != null && runnable.length > 0) {
for (Runnable r : runnable) {
removeCallbacks(r);
}
}
}
@Override
public <T> AbsTask<T> start(AbsTask<T> task) {
TaskProxy<T> proxy;
if (task instanceof TaskProxy) {
proxy = (TaskProxy<T>) task;
} else {
proxy = new TaskProxy<>(task);
}
try {
proxy.doBackground();
} catch (Throwable ex) {
PALog.e("PAAsyncTask", ex.getMessage());
}
return proxy;
}
@Override
public <T> AbsTask<T> start(@NonNull Activity activity, AbsTask<T> task) {
TaskProxy<T> proxy;
if (task instanceof TaskProxy) {
proxy = (TaskProxy<T>) task;
} else {
proxy = new TaskProxy<>(activity, task);
}
try {
proxy.doBackground();
} catch (Throwable ex) {
PALog.e("PAAsyncTask", ex.getMessage());
}
return proxy;
}
@Override
public <T> T startSync(AbsTask<T> task) throws Throwable {
T result = null;
try {
task.onWaiting();
task.onStart();
result = task.doBackground();
task.onSuccess(result);
} catch (Callback.CancelledException cex) {
task.onCancelled(cex);
} catch (Throwable ex) {
task.onError(ex, false);
} finally {
task.onFinished();
}
return result;
}
@Override
public <T extends AbsTask<?>> Callback.Cancelable startTasks(final Callback.GroupCallback<T> groupCallback, final T... tasks) {
if (tasks == null) {
throw new IllegalArgumentException("task must not be null");
}
final Runnable callIfOnAllFinished = new Runnable() {
private final int total = tasks.length;
private final AtomicInteger count = new AtomicInteger(0);
@Override
public void run() {
if (count.incrementAndGet() == total) {
if (groupCallback != null) {
groupCallback.onAllFinished();
}
}
}
};
for (final T task : tasks) {
start(new TaskProxy<T>(task) {
@Override
protected void onSuccess(Object result) {
super.onSuccess(result);
post(new Runnable() {
@Override
public void run() {
if (groupCallback != null) {
groupCallback.onSuccess(task);
}
}
});
}
@Override
protected void onCancelled(final Callback.CancelledException cex) {
super.onCancelled(cex);
post(new Runnable() {
@Override
public void run() {
if (groupCallback != null) {
groupCallback.onCancelled(task, cex);
}
}
});
}
@Override
protected void onError(final Throwable ex, final boolean isCallbackError) {
super.onError(ex, isCallbackError);
post(new Runnable() {
@Override
public void run() {
if (groupCallback != null) {
groupCallback.onError(task, ex, isCallbackError);
}
}
});
}
@Override
protected void onFinished() {
super.onFinished();
post(new Runnable() {
@Override
public void run() {
if (groupCallback != null) {
groupCallback.onFinished(task);
}
callIfOnAllFinished.run();
}
});
}
});
}
return new Callback.Cancelable() {
@Override
public void cancel() {
for (T task : tasks) {
task.cancel();
}
}
@Override
public boolean isCancelled() {
boolean isCancelled = true;
for (T task : tasks) {
if (!task.isCancelled()) {
isCancelled = false;
}
}
return isCancelled;
}
};
}
@Override
public ExecutorService udfThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, String threadName, boolean threadDaemon) {
return UdfThreadPoolUtils.udfThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, threadName, threadDaemon);
}
@Override
public ExecutorService udfNewSingleThreadExecutor(String threadName, boolean threadDaemon) {
return UdfThreadPoolUtils.udfNewSingleThreadExecutor(threadName, threadDaemon);
}
@Override
public ScheduledExecutorService udfNewSingleThreadScheduledExecutor(String threadName, boolean threadDaemon) {
return UdfThreadPoolUtils.udfNewSingleThreadScheduledExecutor(threadName, threadDaemon);
}
}
package com.paic.lib.base.util.thread;
/**
* 任务优先级
*
* @author 优化取自xUitls的线程池工具类
*/
public enum Priority {
UI_TOP, UI_NORMAL, UI_LOW, DEFAULT, BG_TOP, BG_NORMAL, BG_LOW
}
package com.paic.lib.base.util.thread;
import android.support.annotation.NonNull;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 支持优先级的线程池管理类
*
* @author 优化取自xUitls的线程池工具类
*/
public class PriorityExecutor implements Executor {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors() * 2;
private static final int CORE_POLL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POLL_SIZE = CPU_COUNT * 2 + 1;
private static final int WORK_QUEUE_SIZE = 160;// TODO AsyncTask的是128,后期选定一个折中值
private static final int KEEP_ALIVE = 1;
private static final AtomicInteger SEQ_SEED = new AtomicInteger(0);
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(@NonNull Runnable r) {
return new Thread(r, "xTID#" + mCount.getAndIncrement());
}
};
private static final Comparator<Runnable> FIFO_CMP = new Comparator<Runnable>() {
@Override
public int compare(Runnable lhs, Runnable rhs) {
if (lhs instanceof PriorityRunnable &&
rhs instanceof PriorityRunnable) {
PriorityRunnable lpr = (PriorityRunnable) lhs;
PriorityRunnable rpr = (PriorityRunnable) rhs;
int result = lpr.priority.ordinal() - rpr.priority.ordinal();
return result == 0 ? (int) (lpr.SEQ - rpr.SEQ) : result;
} else {
return 0;
}
}
};
private static final Comparator<Runnable> FILO_CMP = new Comparator<Runnable>() {
@Override
public int compare(Runnable lhs, Runnable rhs) {
if (lhs instanceof PriorityRunnable &&
rhs instanceof PriorityRunnable) {
PriorityRunnable lpr = (PriorityRunnable) lhs;
PriorityRunnable rpr = (PriorityRunnable) rhs;
int result = lpr.priority.ordinal() - rpr.priority.ordinal();
return result == 0 ? (int) (rpr.SEQ - lpr.SEQ) : result;
} else {
return 0;
}
}
};
private final ThreadPoolExecutor mThreadPoolExecutor;
public PriorityExecutor(boolean fifo) {
this(CORE_POLL_SIZE, fifo);
}
public PriorityExecutor(int poolSize, boolean fifo) {
BlockingQueue<Runnable> mPoolWorkQueue = new PriorityBlockingQueue<>(WORK_QUEUE_SIZE, fifo ? FIFO_CMP : FILO_CMP);
mThreadPoolExecutor = new ThreadPoolExecutor(
poolSize,
MAXIMUM_POLL_SIZE,
KEEP_ALIVE,
TimeUnit.SECONDS,
mPoolWorkQueue,
sThreadFactory,
new ThreadPoolExecutor.DiscardOldestPolicy());// 不用abort,避免崩溃
// TODO: 2018/11/21 注释掉这个,是因为开启了保存日志功能会出现死循环
// PALog.w("PriorityExecutor", "CPU_COUNT:" + CPU_COUNT + ",CORE_POLL_SIZE:" + CORE_POLL_SIZE
// + ",MAXIMUM_POLL_SIZE:" + MAXIMUM_POLL_SIZE + ",WORK_QUEUE_SIZE:" + WORK_QUEUE_SIZE + ",KEEP_ALIVE:" + KEEP_ALIVE + "s");
}
public int getPoolSize() {
return mThreadPoolExecutor.getCorePoolSize();
}
public void setPoolSize(int poolSize) {
if (poolSize > 0) {
mThreadPoolExecutor.setCorePoolSize(poolSize);
}
}
public ThreadPoolExecutor getThreadPoolExecutor() {
return mThreadPoolExecutor;
}
public boolean isBusy() {
return mThreadPoolExecutor.getActiveCount() >= mThreadPoolExecutor.getCorePoolSize();
}
@Override
public void execute(@NonNull Runnable runnable) {
if (runnable instanceof PriorityRunnable) {
((PriorityRunnable) runnable).SEQ = SEQ_SEED.getAndIncrement();
}
mThreadPoolExecutor.execute(runnable);
}
// -----------------------------------以下为扩展的方法-------------------------------------
/**
* submit与execute区别接收1.参数不一样,2.有返回值,3.方便Exception处理
* future.cancel(true);
* future.isCancelled();
* future.isDone();
*
* @param runnable runnable
* @return Future
*/
public Future<?> submit(@NonNull Runnable runnable) {
if (runnable instanceof PriorityRunnable) {
((PriorityRunnable) runnable).SEQ = SEQ_SEED.getAndIncrement();
}
return mThreadPoolExecutor.submit(runnable);
}
}
package com.paic.lib.base.util.thread;
/**
* 带有优先级的Runnable类型
*
* @author 优化取自xUitls的线程池工具类
*/
public class PriorityRunnable implements Runnable {
long SEQ;
public final Priority priority;
private final Runnable runnable;
public PriorityRunnable(Priority priority, Runnable runnable) {
this.priority = priority == null ? Priority.DEFAULT : priority;
this.runnable = runnable;
}
@Override
public void run() {
this.runnable.run();
}
}
package com.paic.lib.base.util.thread;
import android.app.Activity;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
/**
* 任务管理接口
*
* @author 优化取自xUitls的线程池工具类
*/
public interface TaskController {
/**
* 在UI线程执行Runnable
* 如果已经在UI线程,则直接执行
*
* @param runnable runnable
*/
void autoPost(Runnable runnable);
/**
* 在UI线程执行Runnable
* post到msg queue
*
* @param runnable runnable
*/
void post(Runnable runnable);
/**
* 在UI线程执行Runnable
*
* @param runnable runnable
* @param delayMillis 延迟时间(单位毫秒)
*/
void postDelayed(Runnable runnable, long delayMillis);
/**
* 在后台线程执行Runnable
*
* @param runnable runnable
*/
void execute(Runnable runnable);
/**
* 直接开一个"可缓存线程池"去跑一个线程,该方法开超过4百多线程,APP就会OOM崩溃,所以弃用。
*
* @param runnable runnable
*/
void executeNewCached(Runnable runnable);
/**
* 在后台线程执行Runnable
*
* @param runnable runnable
*/
Future<?> submit(Runnable runnable);
/**
* 移除post或postDelayed提交的,为执行的任务
*
* @param runnable runnable
*/
void removeCallbacks(Runnable runnable);
void removeCallbacks(Runnable... runnable);
/**
* 开始一个异步任务(不考虑内存泄漏)
*
* @param task task
* @param <T> <T>
* @return AbsTask<T>
*/
<T> AbsTask<T> start(AbsTask<T> task);
/**
* 开始一个异步任务(考虑内存泄漏)
*
* @param task task
* @param <T> <T>
* @return AbsTask<T>
*/
<T> AbsTask<T> start(Activity activity, AbsTask<T> task);
/**
* 开始一个同步任务
*
* @param task task
* @param <T> <T>
* @return T
* @throws Throwable
*/
<T> T startSync(AbsTask<T> task) throws Throwable;
/**
* 批量执行异步任务
*
* @param groupCallback groupCallback
* @param tasks tasks
* @param <T> <T>
* @return <T extends AbsTask<?>>
*/
<T extends AbsTask<?>> Callback.Cancelable startTasks(Callback.GroupCallback<T> groupCallback, T... tasks);
/**
* 用户可自定义的线程池方法
*
* @param corePoolSize 核心池的大小
* @param maximumPoolSize 线程池最大线程数
* @param keepAliveTime 等待时间
* @param threadName 线程名称
* @param threadDaemon 是否是守护线程
* @return ExecutorService
*/
ExecutorService udfThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, String threadName, boolean threadDaemon);
/**
* 用户可自定义的NewSingle线程池方法
*
* @param threadName 线程名称
* @param threadDaemon 是否是守护线程
* @return ExecutorService
*/
ExecutorService udfNewSingleThreadExecutor(final String threadName, final boolean threadDaemon);
/**
* 用户可自定义的NewSingleScheduled线程池方法
* 作用:只有一个线程,用来调度执行将来的任务
*
* @param threadName 线程名称
* @param threadDaemon 是否是守护线程
* @return ExecutorService
*/
ScheduledExecutorService udfNewSingleThreadScheduledExecutor(final String threadName, final boolean threadDaemon);
}
package com.paic.lib.base.util.thread;
import android.app.Activity;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import java.lang.ref.WeakReference;
import java.util.concurrent.Executor;
/**
* 异步任务的代理类
*
* @author 优化取自xUitls的线程池工具类
*/
public class TaskProxy<ResultType> extends AbsTask<ResultType> {
static final InternalHandler sHandler = new InternalHandler();
static final PriorityExecutor sDefaultExecutor = new PriorityExecutor(true);
private final WeakReference<Activity> weakActivity;
private final AbsTask<ResultType> task;
private final Executor executor;
private volatile boolean callOnCanceled = false;
private volatile boolean callOnFinished = false;
/**
* TODO 不考虑内存泄漏
*
* @param task task
*/
TaskProxy(AbsTask task) {
super(task);
this.task = task;
this.task.setTaskProxy(this);
this.setTaskProxy(null);
Executor taskExecutor = task.getExecutor();
if (taskExecutor == null) {
taskExecutor = sDefaultExecutor;
}
this.executor = taskExecutor;
this.weakActivity = null;
}
/**
* TODO 考虑内存泄漏
*
* @param activity activity
* @param task task
*/
TaskProxy(Activity activity, AbsTask task) {
super(task);
this.task = task;
this.task.setTaskProxy(this);
this.setTaskProxy(null);
Executor taskExecutor = task.getExecutor();
if (taskExecutor == null) {
taskExecutor = sDefaultExecutor;
}
this.executor = taskExecutor;
this.weakActivity = new WeakReference<>(activity);
}
@Override
protected final ResultType doBackground() throws Throwable {
this.onWaiting();
PriorityRunnable runnable = new PriorityRunnable(task.getPriority(),
new Runnable() {
@Override
public void run() {
try {
//等待过程中取消
if (callOnCanceled || TaskProxy.this.isCancelled()) {
throw new Callback.CancelledException("");
}
//start running
TaskProxy.this.onStart();
//开始时取消
if (TaskProxy.this.isCancelled()) {
throw new Callback.CancelledException("");
}
//执行task,得到结果
task.setResult(task.doBackground());
TaskProxy.this.setResult(task.getResult());
//未在doBackground过程中取消成功
if (TaskProxy.this.isCancelled()) {
throw new Callback.CancelledException("");
}
if (activityIsNoFinishing()) {// 避免内存泄漏
//执行成功
TaskProxy.this.onSuccess(task.getResult());
}
} catch (Callback.CancelledException cex) {
if (activityIsNoFinishing()) {// 避免内存泄漏
TaskProxy.this.onCancelled(cex);
}
} catch (Throwable ex) {
if (activityIsNoFinishing()) {// 避免内存泄漏
TaskProxy.this.onError(ex, false);
}
} finally {
if (activityIsNoFinishing()) {// 避免内存泄漏
TaskProxy.this.onFinished();
}
}
}
});
this.executor.execute(runnable);
return null;
}
/**
* 判断Activity是否finished,避免内存泄漏
*
* @return 未finished
*/
private boolean activityIsNoFinishing() {
return weakActivity == null || (weakActivity.get() != null && !weakActivity.get().isFinishing() && !weakActivity.get().isDestroyed());
}
@Override
protected void onWaiting() {
this.setState(State.WAITING);
sHandler.obtainMessage(MSG_WHAT_ON_WAITING, this)
.sendToTarget();
}
@Override
protected void onStart() {
this.setState(State.STARTED);
sHandler.obtainMessage(MSG_WHAT_ON_START, this)
.sendToTarget();
}
@Override
protected void onSuccess(Object result) {
this.setState(State.SUCCESS);
sHandler.obtainMessage(MSG_WHAT_ON_SUCCESS, this)
.sendToTarget();
}
@Override
protected void onError(Throwable ex, boolean isCallbackError) {
this.setState(State.CANCELLED);
sHandler.obtainMessage(MSG_WHAT_ON_ERROR, new ArgsObj(this, ex))
.sendToTarget();
}
@Override
protected void onUpdate(int flag, Object... args) {
sHandler.obtainMessage(MSG_WHAT_ON_UPDATE, flag, flag, new ArgsObj(this, args))
.sendToTarget();
}
@Override
protected void onCancelled(Callback.CancelledException cex) {
this.setState(State.CANCELLED);
sHandler.obtainMessage(MSG_WHAT_ON_CANCEL, new ArgsObj(this, cex))
.sendToTarget();
}
@Override
protected void onFinished() {
sHandler.obtainMessage(MSG_WHAT_ON_FINISHED, this)
.sendToTarget();
}
@Override
void setState(State state) {
super.setState(state);
this.task.setState(state);
}
@Override
public Priority getPriority() {
return task.getPriority();
}
@Override
public Executor getExecutor() {
return executor;
}
private static class ArgsObj {
final TaskProxy taskProxy;
final Object[] args;
public ArgsObj(TaskProxy taskProxy, Object... args) {
this.taskProxy = taskProxy;
this.args = args;
}
}
private final static int MSG_WHAT_BASE = 1000000000;
private final static int MSG_WHAT_ON_WAITING = MSG_WHAT_BASE + 1;
private final static int MSG_WHAT_ON_START = MSG_WHAT_BASE + 2;
private final static int MSG_WHAT_ON_SUCCESS = MSG_WHAT_BASE + 3;
private final static int MSG_WHAT_ON_ERROR = MSG_WHAT_BASE + 4;
private final static int MSG_WHAT_ON_UPDATE = MSG_WHAT_BASE + 5;
private final static int MSG_WHAT_ON_CANCEL = MSG_WHAT_BASE + 6;
private final static int MSG_WHAT_ON_FINISHED = MSG_WHAT_BASE + 7;
final static class InternalHandler extends Handler {
private InternalHandler() {
super(Looper.getMainLooper());
}
@Override
public void handleMessage(Message msg) {
if (msg.obj == null) {
throw new IllegalArgumentException("msg must not be null");
}
TaskProxy taskProxy = null;
Object[] args = null;
if (msg.obj instanceof TaskProxy) {
taskProxy = (TaskProxy) msg.obj;
} else if (msg.obj instanceof ArgsObj) {
ArgsObj argsObj = (ArgsObj) msg.obj;
taskProxy = argsObj.taskProxy;
args = argsObj.args;
}
if (taskProxy == null) {
throw new RuntimeException("msg.obj not instanceof TaskProxy");
}
try {
switch (msg.what) {
case MSG_WHAT_ON_WAITING:
taskProxy.task.onWaiting();
break;
case MSG_WHAT_ON_START:
taskProxy.task.onStart();
break;
case MSG_WHAT_ON_SUCCESS:
taskProxy.task.onSuccess(taskProxy.getResult());
break;
case MSG_WHAT_ON_ERROR:
assert args != null;
Throwable throwable = (Throwable) args[0];
taskProxy.task.onError(throwable, false);
break;
case MSG_WHAT_ON_UPDATE:
taskProxy.task.onUpdate(msg.arg1, args);
break;
case MSG_WHAT_ON_CANCEL:
if (taskProxy.callOnCanceled) {
return;
}
taskProxy.callOnCanceled = true;
assert args != null;
taskProxy.task.onCancelled((com.paic.lib.base.util.thread.Callback.CancelledException) args[0]);
break;
case MSG_WHAT_ON_FINISHED:
if (taskProxy.callOnFinished) {
return;
}
taskProxy.callOnFinished = true;
taskProxy.task.onFinished();
break;
default:
break;
}
} catch (Throwable ex) {
taskProxy.setState(State.ERROR);
if (msg.what != MSG_WHAT_ON_ERROR) {
taskProxy.task.onError(ex, true);
}
}
super.handleMessage(msg);
}
}
}
package com.paic.lib.base.util.thread;
import android.support.annotation.NonNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 用户可自定义的线程池
* Created by huangrenqian833 on 2018/11/13.
*/
public class UdfThreadPoolUtils {
/**
* 提供给udfThreadPoolExecutor方法使用
*/
private static class EventThreadFactory implements ThreadFactory {
private static final String PREFIX = "pa_udf_thread_pool_";
private static AtomicInteger count = new AtomicInteger();
private String threadName;// 线程名称
private boolean threadDaemon;// 是否是守护线程
public EventThreadFactory(String threadName, boolean threadDaemon) {
this.threadName = threadName;
this.threadDaemon = threadDaemon;
}
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r);
thread.setName(PREFIX + threadName + "#" + count.getAndIncrement());
thread.setDaemon(threadDaemon);
return thread;
}
}
/**
* 用户可自定义的线程池方法
*
* @param corePoolSize 核心池的大小
* @param maximumPoolSize 线程池最大线程数
* @param keepAliveTime 等待时间
* @param threadName 线程名称
* @param threadDaemon 是否是守护线程
* @return ExecutorService
*/
public static ExecutorService udfThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, String threadName, boolean threadDaemon) {
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(),
new EventThreadFactory(threadName, threadDaemon),
new ThreadPoolExecutor.DiscardOldestPolicy());// 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
}
/**
* 用户可自定义的NewSingle线程池方法
* 作用:只有一个线程的线程池,因此所有提交的任务是顺序执行
*
* @param threadName 线程名称
* @param threadDaemon 是否是守护线程
* @return ExecutorService
*/
public static ExecutorService udfNewSingleThreadExecutor(final String threadName, final boolean threadDaemon) {
return Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(threadDaemon);
return thread;
}
});
}
/**
* 用户可自定义的NewSingleScheduled线程池方法
* 作用:只有一个线程,用来调度执行将来的任务
*
* @param threadName 线程名称
* @param threadDaemon 是否是守护线程
* @return ExecutorService
*/
public static ScheduledExecutorService udfNewSingleThreadScheduledExecutor(final String threadName, final boolean threadDaemon) {
return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(threadDaemon);
return thread;
}
});
}
}