ThreadPoolTaskExecutor和ThreadPoolExecutor

ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。

ThreadPoolTaskExecutor的配置类 ThreadPoolTaskConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync // 开启异步功能
public class ThreadPoolTaskConfig {

    @Bean("threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();

        // 线程池的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        threadPoolTaskExecutor.setCorePoolSize(8);

        // 如果设置setAllowCoreThreadTimeOut=true(默认false)时,核心线程会超时关闭
        // threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);

        // 阻塞队列 当核心线程数达到最大时,新的任务会放在队列中排队等待执行
        threadPoolTaskExecutor.setQueueCapacity(124);

        // 最大线程池数量,当线程数>=corePoolSize,且任务队列已满时,线程池会创建新的线程来处理任务
        // 任务队列已满时,且当线程数=maxPoolSize,线程池会拒绝处理任务而抛出异常
        threadPoolTaskExecutor.setMaxPoolSize(64);

        // 当线程空闲时间达到KeepAliveSeconds时,线程会推出,知道线程数量=CorePoolSize
        // 允许线程空闲时间为60s,当MaxPoolSize的线程在空闲时间到达的时候销毁
        // 如果AllowCoreThreadTimeOut=true,则会直到线程数量=0
        threadPoolTaskExecutor.setKeepAliveSeconds(30);

        // Spring提供的 ThreadPoolTaskExecutor线程池,是有setThreadNamePrefix方法的
        // JDK提供的ThreadPoolExecutor线程池没有setThreadNamePrefix方法
        threadPoolTaskExecutor.setThreadNamePrefix("XXXX自定义线程池");

        // 拒绝策略
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        threadPoolTaskExecutor.initialize();

        return threadPoolTaskExecutor;
    }

}

查看 initialize()方法,它调用的是ExecutorConfigurationSupport 的initialize()方法

    /**
	 * Set up the ExecutorService.
	 */
	public void initialize() {
		if (logger.isDebugEnabled()) {
			logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
		}
		if (!this.threadNamePrefixSet && this.beanName != null) {
			setThreadNamePrefix(this.beanName + "-");
		}
		this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
	}

initialize()方法调用了本类的initialzeExcutor()方法,但是这个类是一个抽象类,initialzeExcutor()方法也是一个抽象方法

    /**
	 * Create the target {@link java.util.concurrent.ExecutorService} instance.
	 * Called by {@code afterPropertiesSet}.
	 * @param threadFactory the ThreadFactory to use
	 * @param rejectedExecutionHandler the RejectedExecutionHandler to use
	 * @return a new ExecutorService instance
	 * @see #afterPropertiesSet()
	 */
	protected abstract ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);

ctrl+alt+b 查看它的实现类,它有四个实现类,其中一个就是ThreadPoolTaskExecutor,查看它的initialzeExcutor()方法,它其实内部使用了ThreadPoolExecutor。

	/**
	 * Note: This method exposes an {@link ExecutorService} to its base class
	 * but stores the actual {@link ThreadPoolExecutor} handle internally.
	 * Do not override this method for replacing the executor, rather just for
	 * decorating its {@code ExecutorService} handle or storing custom state.
	 */
	@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

所以说:ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理

猜你喜欢

转载自blog.csdn.net/ElendaLee/article/details/127426306
今日推荐