代码来自书籍《java多线程编程实战指南》
代码1:多线程下载不同网络图片
(容易出现格式问题)
public class FileDownloaderApp {
public static void main(String[] args) throws InterruptedException {
String[]pics =new String[]{
"https://img.redocn.com/sheji/20200219/jinzhikanfagongyihaibao_10824681.jpg.400.jpg",
"https://img.redocn.com/sheji/20180928/qinglvsehuaduopingpubeijing_9694163.jpg.400.jpg"};
Thread downloaderThread = null;
for (String url : pics) {
// 创建文件下载器线程
downloaderThread = new Thread(new FileDownloader(url));
// 启动文件下载器线程
downloaderThread.start();
}
}
// 文件下载器
static class FileDownloader implements Runnable {
private final String fileURL;
public FileDownloader(String fileURL) {
this.fileURL = fileURL;
}
volatile AtomicInteger N = new AtomicInteger(0);
@Override
public void run() {
Debug.info("Downloading from " + fileURL);
String fileBaseName = fileURL.substring(fileURL.lastIndexOf('/') + 1);
try {
URL url = new URL(fileURL);
// System.getProperty("java.io.tmpdir") +"viscent\\";
while (N.equals(0))
{
N.addAndGet(1);
}
String localFileName ="C:/Users/14172/Pictures/" + fileBaseName;
File file = new File(localFileName);
if(!file.exists())
{
file.createNewFile();
}
// System.getProperty("java.io.tmpdir") 是获取操作系统的缓存临时目录
System.out.println("Saving to: " + localFileName);
downloadFile(url, new FileOutputStream(
localFileName), 1024);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Done downloading from " + fileURL);
}
// 从指定的URL下载文件,并将其保存到指定的输出流中
private void downloadFile(URL url, OutputStream outputStream, int bufSize)
throws MalformedURLException, IOException {
// 建立HTTP连接
final HttpURLConnection httpConn = (HttpURLConnection) url
.openConnection();
httpConn.setRequestMethod("GET");
ReadableByteChannel inChannel = null;
WritableByteChannel outChannel = null;
try {
// 获取HTTP响应码
int responseCode = httpConn.getResponseCode();
// HTTP响应非正常:响应码不为2开头
if (2 != responseCode / 100) {
throw new IOException("Error: HTTP " + responseCode);
}
if (0 == httpConn.getContentLength()) {
Debug.info("Nothing to be downloaded " + fileURL);
return;
}
inChannel = Channels
.newChannel(new BufferedInputStream(httpConn.getInputStream()));
outChannel = Channels
.newChannel(new BufferedOutputStream(outputStream));
ByteBuffer buf = ByteBuffer.allocate(bufSize);
while (-1 != inChannel.read(buf)) {
buf.flip();
outChannel.write(buf);
buf.clear();
}
} finally {
// 关闭指定的Channel以及HttpURLConnection
Tools.silentClose(inChannel, outChannel);
httpConn.disconnect();
}
}// downloadFile结束
}// FileDownloader结束
}
其中tools关闭文件的方法:
public final class Tools {
private static final Random rnd = new Random();
private static final Logger LOGGER = Logger.getAnonymousLogger();
public static void startAndWaitTerminated(Thread... threads)
throws InterruptedException {
if (null == threads) {
throw new IllegalArgumentException("threads is null!");
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
public static void startAndWaitTerminated(Iterable<Thread> threads)
throws InterruptedException {
if (null == threads) {
throw new IllegalArgumentException("threads is null!");
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
public static void randomPause(int maxPauseTime) {
int sleepTime = rnd.nextInt(maxPauseTime);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void randomPause(int maxPauseTime, int minPauseTime) {
int sleepTime = maxPauseTime == minPauseTime ? minPauseTime : rnd
.nextInt(maxPauseTime - minPauseTime) + minPauseTime;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void silentClose(Closeable... closeable) {
if (null == closeable) {
return;
}
for (Closeable c : closeable) {
if (null == c) {
continue;
}
try {
c.close();
} catch (Exception ignored) {
}
}
}
public static void split(String str, String[] result, char delimeter) {
int partsCount = result.length;
int posOfDelimeter;
int fromIndex = 0;
String recordField;
int i = 0;
while (i < partsCount) {
posOfDelimeter = str.indexOf(delimeter, fromIndex);
if (-1 == posOfDelimeter) {
recordField = str.substring(fromIndex);
result[i] = recordField;
break;
}
recordField = str.substring(fromIndex, posOfDelimeter);
result[i] = recordField;
i++;
fromIndex = posOfDelimeter + 1;
}
}
public static void log(String message) {
LOGGER.log(Level.INFO, message);
}
public static String md5sum(final InputStream in) throws NoSuchAlgorithmException, IOException {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] buf = new byte[1024];
try (DigestInputStream dis = new DigestInputStream(in, md)) {
while (-1 != dis.read(buf)){
}
;
}
byte[] digest = md.digest();
BigInteger bigInt = new BigInteger(1, digest);
String checkSum = bigInt.toString(16);
while (checkSum.length() < 32) {
checkSum = "0" + checkSum;
}
return checkSum;
}
public static String md5sum(final File file) throws NoSuchAlgorithmException, IOException {
return md5sum(new BufferedInputStream(new FileInputStream(file)));
}
public static void delayedAction(String prompt, Runnable action, int delay/* seconds */) {
Debug.info("%s in %d seconds.", prompt, delay);
try {
Thread.sleep(delay * 1000);
} catch (InterruptedException ignored) {
}
action.run();
}
public static Object newInstanceOf(String className) throws InstantiationException,
IllegalAccessException, ClassNotFoundException {
return Class.forName(className).newInstance();
}
}
Debug下面的info方法:
public class Debug {
private static ThreadLocal<SimpleDateFormat> sdfWrapper = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
}
};
enum Label {
INFO("INFO"),
ERR("ERROR");
String name;
Label(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
// public static void info(String message) {
// printf(Label.INFO, "%s", message);
// }
public static void info(String format, Object... args) {
printf(Label.INFO, format, args);
}
public static void info(boolean message) {
info("%s", message);
}
public static void info(int message) {
info("%d", message);
}
public static void error(String message, Object... args) {
printf(Label.ERR, message, args);
}
public static void printf(Label label, String format, Object... args) {
SimpleDateFormat sdf = sdfWrapper.get();
@SuppressWarnings("resource")
final PrintStream ps = label == Label.INFO ? System.out : System.err;
ps.printf('[' + sdf.format(new Date()) + "][" + label.getName()
+ "]["
+ Thread.currentThread().getName() + "]:" + format + " %n", args);
}
}
执行程序后输出:
同时查看自己电脑上的文件夹,可以看到刚刚多出来的图片。
代码2:基于数据分割的大文件下载器
在日常生活中,我们下载大文件的时候往往是使用专门的下载软件而不是直接使用浏览器。这些下载软件下载大文件时比较快的一个重要原因就是它们使用多线程技术。例如,一个大小为600MB的文件在网络带宽为100Mbps的情况下,使用单个线程下载该文件至少需要耗时48(=600/(100/8))秒。如果我们采用3个线程来下载该文件,其中每个线程分别下载该文件的一个部分,那么下载这个文件所需的时间基本上可以减少为16(=600/3/(100/8))秒,比起单线程下载节省了2/3的时间。按照这个思路实现的一个基于多线程的大文件下载器
首先,我们先获取待下载资源的大小,这个大小相当于文件下载器的输入数据的原始规模(总规模)。接着,我们根据设定的下载线程数(workerThreadsCount)来决定子任务的总个数,并由此确定每个子任务负责下载的数据段的范围(起始字节到结束字节,lowerBound~upperBound)。然后我们分别创建相应的下载子任务(DownloadTask类实例)并为每个下载任务创建相应的下载线程。这些线程启动后就会并发地下载大文件中的相应部分。
作为包装的存储对象类:
public class Storage implements Closeable, AutoCloseable {
private final RandomAccessFile storeFile;
private final FileChannel storeChannel;
protected final AtomicLong totalWrites = new AtomicLong(0);
public Storage(long fileSize, String fileShortName) throws IOException {
String fullFileName = System.getProperty("java.io.tmpdir") + "/"
+ fileShortName;
String localFileName;
localFileName = createStoreFile(fileSize, fullFileName);
storeFile = new RandomAccessFile(localFileName, "rw");
storeChannel = storeFile.getChannel();
}
/**
* 将data中指定的数据写入文件
*
* @param offset
* 写入数据在整个文件中的起始偏移位置
* @param byteBuf
* byteBuf必须在该方法调用前执行byteBuf.flip()
* @throws IOException
* @return 写入文件的数据长度
*/
public int store(long offset, ByteBuffer byteBuf)
throws IOException {
int length;
storeChannel.write(byteBuf, offset);
length = byteBuf.limit();
totalWrites.addAndGet(length);
return length;
}
public long getTotalWrites() {
return totalWrites.get();
}
private String createStoreFile(final long fileSize, String fullFileName)
throws IOException {
File file = new File(fullFileName);
Debug.info("create local file:%s", fullFileName);
RandomAccessFile raf;
raf = new RandomAccessFile(file, "rw");
try {
raf.setLength(fileSize);
} finally {
Tools.silentClose(raf);
}
return fullFileName;
}
@Override
public synchronized void close() throws IOException {
if (storeChannel.isOpen()) {
Tools.silentClose(storeChannel, storeFile);
}
}
}
主文件下载类:
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 大文件下载器
*
* @author Viscent Huang
*/
public class BigFileDownloader {
protected final URL requestURL;
protected final long fileSize;
/**
* 负责已下载数据的存储
*/
protected final Storage storage;
protected final AtomicBoolean taskCanceled = new AtomicBoolean(false);
public BigFileDownloader(String strURL) throws Exception {
requestURL = new URL(strURL);
// 获取待下载资源的大小(单位:字节)
fileSize = retieveFileSize(requestURL);
Debug.info("file total size:%s", fileSize);
String fileName = strURL.substring(strURL.lastIndexOf('/') + 1);
// 创建负责存储已下载数据的对象
storage = new Storage(fileSize, fileName);
}
/**
* 下载指定的文件
*
* @param taskCount
* 任务个数
* @param reportInterval
* 下载进度报告周期
* @throws Exception
*/
public void download(int taskCount, long reportInterval)
throws Exception {
long chunkSizePerThread = fileSize / taskCount;
// 下载数据段的起始字节
long lowerBound = 0;
// 下载数据段的结束字节
long upperBound = 0;
DownloadTask dt;
for (int i = taskCount - 1; i >= 0; i--) {
lowerBound = i * chunkSizePerThread;
if (i == taskCount - 1) {
upperBound = fileSize;
} else {
upperBound = lowerBound + chunkSizePerThread - 1;
}
// 创建下载任务
dt = new DownloadTask(lowerBound, upperBound, requestURL, storage,
taskCanceled);
dispatchWork(dt, i);
}
// 定时报告下载进度
reportProgress(reportInterval);
// 清理程序占用的资源
doCleanup();
}
protected void doCleanup() {
Tools.silentClose(storage);
}
protected void cancelDownload() {
if (taskCanceled.compareAndSet(false, true)) {
doCleanup();
}
}
protected void dispatchWork(final DownloadTask dt, int workerIndex) {
// 创建下载线程
Thread workerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
dt.run();
} catch (Exception e) {
e.printStackTrace();
// 取消整个文件的下载
cancelDownload();
}
}
});
workerThread.setName("downloader-" + workerIndex);
workerThread.start();
}
// 根据指定的URL获取相应文件的大小
private static long retieveFileSize(URL requestURL) throws Exception {
long size = -1;
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) requestURL.openConnection();
conn.setRequestMethod("HEAD");
conn.setRequestProperty("Connection", "Keep-alive");
conn.connect();
int statusCode = conn.getResponseCode();
if (HttpURLConnection.HTTP_OK != statusCode) {
throw new Exception("Server exception,status code:" + statusCode);
}
String cl = conn.getHeaderField("Content-Length");
size = Long.valueOf(cl);
} finally {
if (null != conn) {
conn.disconnect();
}
}
return size;
}
// 报告下载进度
private void reportProgress(long reportInterval) throws InterruptedException {
float lastCompletion;
int completion = 0;
while (!taskCanceled.get()) {
lastCompletion = completion;
completion = (int) (storage.getTotalWrites() * 100 / fileSize);
if (completion == 100) {
break;
} else if (completion - lastCompletion >= 1) {
Debug.info("Completion:%s%%", completion);
if (completion >= 90) {
reportInterval = 1000;
}
}
Thread.sleep(reportInterval);
}
Debug.info("Completion:%s%%", completion);
}
}
子任务下载类:
/**
* 下载子任务
*
* @author Viscent Huang
*/
public class DownloadTask implements Runnable {
private final long lowerBound;
private final long upperBound;
private final DownloadBuffer xbuf;
private final URL requestURL;
private final AtomicBoolean cancelFlag;
public DownloadTask(long lowerBound, long upperBound, URL requestURL,
Storage storage, AtomicBoolean cancelFlag) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.requestURL = requestURL;
this.xbuf = new DownloadBuffer(lowerBound, upperBound, storage);
this.cancelFlag = cancelFlag;
}
// 对指定的URL发起HTTP分段下载请求
private static InputStream issueRequest(URL requestURL, long lowerBound,
long upperBound) throws IOException {
Thread me = Thread.currentThread();
Debug.info(me + "->[" + lowerBound + "," + upperBound + "]");
final HttpURLConnection conn;
InputStream in = null;
conn = (HttpURLConnection) requestURL.openConnection();
String strConnTimeout = System.getProperty("x.dt.conn.timeout");
int connTimeout = null == strConnTimeout ? 60000 : Integer
.valueOf(strConnTimeout);
conn.setConnectTimeout(connTimeout);
String strReadTimeout = System.getProperty("x.dt.read.timeout");
int readTimeout = null == strReadTimeout ? 60000 : Integer
.valueOf(strReadTimeout);
conn.setReadTimeout(readTimeout);
conn.setRequestMethod("GET");
conn.setRequestProperty("Connection", "Keep-alive");
// Range: bytes=0-1024
conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);
conn.setDoInput(true);
conn.connect();
int statusCode = conn.getResponseCode();
if (HttpURLConnection.HTTP_PARTIAL != statusCode) {
conn.disconnect();
throw new IOException("Server exception,status code:" + statusCode);
}
Debug.info(me + "-Content-Range:" + conn.getHeaderField("Content-Range")
+ ",connection:" + conn.getHeaderField("connection"));
in = new BufferedInputStream(conn.getInputStream()) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
conn.disconnect();
}
}
};
return in;
}
@Override
public void run() {
if (cancelFlag.get()) {
return;
}
ReadableByteChannel channel = null;
try {
channel = Channels.newChannel(issueRequest(requestURL, lowerBound,
upperBound));
ByteBuffer buf = ByteBuffer.allocate(1024);
while (!cancelFlag.get() && channel.read(buf) > 0) {
// 将从网络读取的数据写入缓冲区
xbuf.write(buf);
buf.clear();
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
Tools.silentClose(channel, xbuf);
}
}
}
处理缓存:
每个下载线程从网络读取一段数据(例如1KB的数据)就将其写入文件这种方法固然简单,但是容易增加I/O的次数。有鉴于此,我们采用了缓冲的方法:下载线程每次从网络读取的数据都是先被写入缓冲区,只有当这个缓冲区满的时候其中的内容才会被写入本地文件。
这个缓冲区是通过类DownloadBuffer实现的,将缓冲区中的内容写入本地文件是通过类Storage实现的。
public class DownloadBuffer implements Closeable {
/**
* 当前Buffer中缓冲的数据相对于整个存储文件的位置偏移
*/
private long globalOffset;
private long upperBound;
private int offset = 0;
public final ByteBuffer byteBuf;
private final Storage storage;
public DownloadBuffer(long globalOffset, long upperBound,
final Storage storage) {
this.globalOffset = globalOffset;
this.upperBound = upperBound;
this.byteBuf = ByteBuffer.allocate(1024 * 1024);
this.storage = storage;
}
public void write(ByteBuffer buf) throws IOException {
int length = buf.position();
final int capacity = byteBuf.capacity();
// 当前缓冲区已满,或者剩余容量不够容纳新数据
if (offset + length > capacity || length == capacity) {
// 将缓冲区中的数据写入文件
flush();
}
byteBuf.position(offset);
buf.flip();
byteBuf.put(buf);
offset += length;
}
public void flush() throws IOException {
int length;
byteBuf.flip();
length = storage.store(globalOffset, byteBuf);
byteBuf.clear();
globalOffset += length;
offset = 0;
}
@Override
public void close() throws IOException {
Debug.info("globalOffset:%s,upperBound:%s", globalOffset, upperBound);
if (globalOffset < upperBound) {
flush();
}
}
}
启动类:
public class CaseRunner4_1 {
public static void main(String[] args) throws Exception {
if (0 == args.length) {
args = new String[] {
"http://yourserver.com/bigfile", "2", "3" };
}
main0(args);
}
public static void main0(String[] args) throws Exception {
final int argc = args.length;
BigFileDownloader downloader = new BigFileDownloader(args[0]);
// 下载线程数
int workerThreadsCount = argc >= 2 ? Integer.valueOf(args[1]) : 2;
long reportInterval = argc >= 3 ? Integer.valueOf(args[2]) : 2;
Debug.info("downloading %s%nConfig:worker threads:%s,reportInterval:%s s.",
args[0], workerThreadsCount, reportInterval);
downloader.download(workerThreadsCount, reportInterval * 1000);
}
}
数据分割思想产生的问题
数据的分割这种并发化策略是从程序处理的数据角度入手,将原始输入分解为若干规模更小的子输入,并将这些子输入指派给专门的工作者线程处理。
基于数据的分割的结果是产生多个同质工作者线程,即任务处理逻辑相同的线程。例如,上述案例中的BigFileDownloader创建的工作者线程都是DownloadTask的实例。尽管基于数据的分割的基本思想不难理解,但是在实际运用中,我们往往有更多的细节需要考虑。
1.工作者线程数量的合理设置问题。
在原始输入规模一定的情况下,增加工作者线程数量可以减小子输入的规模,从而减少每个工作者线程执行任务所需的时间。但是线程数量的增加也会导致其他开销(比如上下文切换)增加。例如,上述案例从表面上看,我们似乎可以指定更多的下载线程数来缩短资源下载耗时。比如,我们设定10个线程用于下载一个大小为600MB的资源,那么每个线程仅需要下载这个大文件中60MB的数据,这样看来似乎我们仅需要单线程下载的1/6时间就可以完成整个资源下载。但实际的结果却可能并非如此:增加下载线程数的确可以减少每个下载线程的输入规模(子输入的规模),从而缩短每个下载线程完成数据段下载所需的时间;但是这同时也增加了上下文切换的开销、线程创建与销毁的开销、建立网络连接的开销以及锁的争用等开销,而这些增加的开销可能无法被子输入规模减小所带来的好处所抵消。另一方面,工作者线程数量过少又可能导致子输入的规模仍然过大,这使得计算效率提升不明显。在本案例中,我们通过命令行参数指定工作者线程数量,本章后续内容会介绍工作者线程数的合理设置。
- 工作者线程的异常处理问题。
对于一个工作者线程执行过程中出现的异常,我们该如何处理呢?例如,在本案例的一个下载线程执行过程中出现异常的时候,这个线程是可以进行重试(针对可恢复的故障)呢,还是说直接就算整个资源的下载失败呢?如果是算这个资源下载失败,那么此时其他工作者线程就没有必要继续运行下去了。因此,此时就涉及终止其他线程的运行问题。
3.原始输入规模未知问题。
在上述例子中,由于原始输入的规模是事先可知的,因此我们可以采用简单的均分对原始输入进行分解。但是,某些情况下我们可能无法事先确定原始输入的规模,或者事先确定原始输入规模是一个开销极大的计算。比如,要从几百个日志文件(其中每个文件可包含上万条记录)中统计出我们所需的信息,尽管理论上我们可以事先计算出总记录条数,但是这样做的开销会比较大,因而实际上这是不可行的。此时原始输入的规模就相当于事先不可知。对于这种原始输入规模事先不可知的问题,我们可以采用批处理的方式对原始输入进行分解:聚集了一批数据之后再将这些数据指派给工作者线程进行处理。这种方法类似于公安局办证中心办理护照的情形,虽然每天都可能有人去申请护照,但是办证中心并不是为每个申请人专门办理护照的,而是凑足一批申请人的材料后才进行统一办理的。在批处理的分解方式中,工作者线程往往是事先启动的,并且我们还需要考虑这些工作者线程的负载均衡问题,即新聚集的一批数据按照什么样的规则被指派给哪个工作者线程的问题。——如果我们把新聚集的一批数据看作一个请求,而把工作者线程看作一个“服务器节点”,那么这两个问题实际上就是一个问题。 程序的复杂性增加的问题。
4.基于数据的分割产生的多线程程序可能比相应的单线程程序要复杂。
例如,上述案例中虽然多个工作者线程并发地从服务器上下载大文件可以提升计算效率,但是它也带来一个问题:这些数据段是并发地从服务器上下载的,但是我们最终要得到的是一个完整的大文件,而不是几个较小的文件。因此,我们有两种选择:其中一种方法是,各个工作者线程将其下载的数据段分别写入各自的本地文件(子文件),等到所有工作者线程结束之后,我们再将这些子文件合并为我们最终需要的文件。显然,当待下载的资源非常大的时候合并这些子文件也是一笔不小的开销。另外一种方法是将各个工作者线程从服务器上下载到的数据都写入同一个本地文件,这个文件被写满之后就是我们最终所需的大文件。第二种方法看起来比较简单,但是这里面有个矛盾需要调和:文件数据是并发地从服务器上下载(读取)的,但是将这些数据写入本地文件的时候,我们又必须确保这些数据按照原始文件(服务器上的资源)的顺序被写入这个本地文件的相应位置(起始字节和结束字节)。