import org.apache.commons.io.FileUtils;
import java.io.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class MutiProcessor {
ExecutorService executorService = Executors.newCachedThreadPool();
private Semaphore semaphore = null;
public static MutiProcessor getInstance(int clientTotal) {
MutiProcessor mutiProcessor = new MutiProcessor();
mutiProcessor.semaphore = new Semaphore(clientTotal);
return mutiProcessor;
}
/**
* @throws Exception 读取结束
*/
public void end() throws Exception {
executorService.shutdown();
if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {
System.out.println("end!");
}
}
/**
* @param collection
* @param consumer
* @param <E>
* @return
* @throws Exception 读取集合
*/
public <E> MutiProcessor read(Collection<E> collection, Consumer<E> consumer) throws Exception {
for (E take : collection) {
semaphore.acquire();
executorService.execute(() -> {
try {
consumer.accept(take);
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
});
}
return this;
}
/**
* @param file
* @param consumer
* @return
* @throws Exception 一次读取全部文件
*/
public MutiProcessor readLines(File file, Consumer<String> consumer) throws Exception {
List<String> list = FileUtils.readLines(file, "utf-8");
read(list, consumer);
return this;
}
/**
* @param file
* @param consumer
* @return
* @throws Exception 按照行来读取,一行一行处理
*/
public MutiProcessor readLine(File file, Consumer<String> consumer) throws Exception {
BufferedReader csvReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String records = csvReader.readLine();
while (records != null) {
read(Arrays.asList(records), consumer);
records = csvReader.readLine();
}
csvReader.close();
return this;
}
/**
* @param file
* @param consumer
* @return
* @throws Exception 按照行来读取,一行一行处理
*/
public MutiProcessor readLine(String file, Consumer<String> consumer) throws Exception {
return readLine(new File(file), consumer);
}
/**
* @param file
* @param consumer
* @return
* @throws Exception 按照行来读取,一行一行处理
*/
public MutiProcessor readLines(String file, Consumer<String> consumer) throws Exception {
return readLines(new File(file), consumer);
}
}
public class Test {
public static void main(String[] args) throws Exception {
MutiProcessor.getInstance(10).readLine("D:\\Users\\liuyub\\Desktop\\地址.txt", Test::resolve).end();
}
private static void resolve(String string) {
System.out.println(string);
}
}