Java 多线程任务分发实现
1.本文将实现多线程分任务下载图片的功能
2.首先需要三个类,分别是 任务分发器、任务类和执行类
1. 执行线程类
import java.util.List;
/**
* 自定义的工作线程,持有分派给它执行的任务列表
*/
public class CaptureWorkThread extends Thread {
// 本线程待执行的任务列表,你也可以指为任务索引的起始值
private List<DownLoadCaptureThread> taskList;@SuppressWarnings("unused")
private int threadId;/**
* 构造工作线程,为其指派任务列表,及命名线程 ID
*
* @param taskList
* 欲执行的任务列表
* @param threadId
* 线程 ID
*/
@SuppressWarnings("unchecked")
public CaptureWorkThread(List taskList, int threadId) {
this.taskList = taskList;
this.threadId = threadId;
}/**
* 执行被指派的所有任务
*/
public void run() {
for (DownLoadCaptureThread task : taskList) {
task.execute();
}
}
}
2. 创建任务类
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.X509Certificate;
import java.util.concurrent.ExecutorService;import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.sensetime.fis.senseguard.configuration.SenseFaceConfiguration;
import com.sensetime.fis.senseguard.vo.ImdaExportVo;public class DownLoadCaptureThread {
private static final Logger LOGGER = LoggerFactory.getLogger(DownLoadCaptureThread.class);
// private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
private static final ExecutorService EXPORT_THREAD = SenseFaceConfiguration.TARGET_LIBRARY_EXPORT_THREADPOOL;
// 开始位置
private String urlPath;
// 结束位置
private String filePath;
// 线程ID
private ImdaExportVo vo;
// excel 路径
private String captureImageRelativeUri;public DownLoadCaptureThread(String urlPath, String filePath, ImdaExportVo vo, String captureImageRelativeUri) {
super();
this.urlPath = urlPath;
this.filePath = filePath;
this.vo = vo;
this.captureImageRelativeUri = captureImageRelativeUri;
}public void execute() {
synchronized (this) {
try {
//https 免验证
trustAllHosts();
// 构造URL
URL url = new URL(urlPath);
// 打开连接
URLConnection conn = url.openConnection();
// 通过请求地址判断请求类型(http或者是https)
if (url.getProtocol().toLowerCase().equals("https")) {
HttpsURLConnection https = (HttpsURLConnection) url.openConnection();
https.setHostnameVerifier(DO_NOT_VERIFY);
conn = https;
}
// 设置请求超时为5s
conn.setConnectTimeout(3 * 100);if (conn.getContentLength() == -1) {
// 输入流
InputStream is = conn.getInputStream();
// 1K的数据缓冲
byte[] bs = new byte[1024];
// 读取到的数据长度
int len;
// 输出的文件流
File sf = new File(filePath);
if (!sf.exists()) {
sf.createNewFile();
}
OutputStream os = new FileOutputStream(sf.getPath());
// 开始读取
while ((len = is.read(bs)) != -1) {
os.write(bs, 0, len);
}
LOGGER.info("captureImageRelativeUri:"+captureImageRelativeUri);
ImageCompressThread imageCompressThread = new ImageCompressThread(filePath, filePath, 1000, 800, null);
EXPORT_THREAD.submit(imageCompressThread);
// 完毕,关闭所有链接
os.close();
is.close();
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}private static void trustAllHosts() {
// Create a trust manager that does not validate certificate chains
TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return new java.security.cert.X509Certificate[] {};
}public void checkClientTrusted(X509Certificate[] chain, String authType) {
}public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
} };
// Install the all-trusting trust manager
try {
SSLContext sc = SSLContext.getInstance("TLS");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
} catch (Exception e) {
e.printStackTrace();
}
}private final static HostnameVerifier DO_NOT_VERIFY = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};}
3.任务分发器
import java.util.ArrayList;
import java.util.List;/**
* 指派任务列表给线程的分发器
*/
public class TaskDistributor {/**
* 测试方法
*
* @param args
*/
@SuppressWarnings("unchecked")
public static void main(String[] args) {
// 初始化要执行的任务列表
List taskList = new ArrayList();
for (int i = 0; i < 100; i++) {
taskList.add(new DownLoadCaptureThread("","", null,null));
}
// 设定要启动的工作线程数为 4 个
int threadCount = 4;
List[] taskListPerThread = distributeTasks(taskList, threadCount);
System.out.println("实际要启动的工作线程数:" + taskListPerThread.length);
for (int i = 0; i < taskListPerThread.length; i++) {
Thread workThread = new WorkThread(taskListPerThread[i], i);
workThread.start();
}
}/**
* 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程 返回的数组有多少个元素 (List) 就表明将启动多少个工作线程
*
* @param taskList
* 待分派的任务列表
* @param threadCount
* 线程数
* @return 列表的数组,每个元素中存有该线程要执行的任务列表
*/
@SuppressWarnings("unchecked")
public static List[] distributeTasks(List taskList, int threadCount) {
// 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务
int minTaskCount = taskList.size() / threadCount;
// 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中
int remainTaskCount = taskList.size() % threadCount;
// 实际要启动的线程数,如果工作线程比任务还多
// 自然只需要启动与任务相同个数的工作线程,一对一的执行
// 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程
int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;
// 要启动的线程数组,以及每个线程要执行的任务列表
List[] taskListPerThread = new List[actualThreadCount];
int taskIndex = 0;
// 平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount
// 相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦
int remainIndces = remainTaskCount;
for (int i = 0; i < taskListPerThread.length; i++) {
taskListPerThread[i] = new ArrayList();
// 如果大于零,线程要分配到基本的任务
if (minTaskCount > 0) {
for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
taskListPerThread[i].add(taskList.get(j));
}
taskIndex += minTaskCount;
}
// 假如还有剩下的,则补一个到这个线程中
if (remainIndces > 0) {
taskListPerThread[i].add(taskList.get(taskIndex++));
remainIndces--;
}
}
return taskListPerThread;
}
}
4. 执行task
// 任务分发 capture
List[] captureTaskListPerThread = TaskDistributor.distributeTasks(captureThreadList, EXPORT_THREAD_COUNT);for (int i = 0; i < captureTaskListPerThread.length; i++) {
Thread workThread = new CaptureWorkThread(captureTaskListPerThread[i], i);
workThread.start();
}
这样一个完整的任务分发就实现了。