题目
简介,多线程读取文件,消费者过滤排序
具体说明:
假设本地有一个文件夹,文件夹下面有若干文件,
文件格式如下:
2000102,100,98.3
2000103,101,73.3
2000104,102,98.3
2000105,100,101.3
2000106,101,45.3
文件格式说明:
每行由三列构成,
第一列:id,
第二列:groupId,
第三列:指标quota
功能要求:
1.把所有文件里面的内容按照分组进行排序,
2.所有文件按照分组排序之后,输出每个分组下面的最小指标值。
非功能要求:
1.文件读取要有线程池来执行,线程池的大小固定为10,文件内容需要存储到指定的内容数据结构当中
2.查找要求有独立线程来执行,直接消费读取线程池产生的内存数据结构。
3.文件读取和排序要求并发作业,文件读取只要产生了数据,就可以把数据交给排序线程进行消费,计算最小值。
代码要求
1.重上面的要求语意里面抽象出合适的设计模式。
2.需要考虑多线程的并发控制,同步机制。
3.代码实现只能用JDK1.6或者1.8自带的工具类
————————————————
版权声明:本文为CSDN博主「ZK_小姜」的原创文章,遵循 CC 4.0 BY-SA
版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u014039577/article/details/82387302
思路
1.首先类分为生产者/消费者
2.生产者把任务拆分为多线程执行,具体执行线程为Worker。使用CountDownLatch的await方法,等待所有线程结束,后退出。
3.消费者使用阻塞队列,收到退出命令后,等待消费完毕,然后中断阻塞队列的take阻塞。
4.关于Demo,使用join等待生产者的所有任务完成后,控制消费者退出。
注意:此处join必须结合CountDownLatch。否则生产者分配任务,开启所有子线程后,会直接退出。子任务继续执行。
5.关于interrupt,中断抛出异常后,中断状态会重置为“未中断”,
所以while (!isInterrupted())不一定能退出循环,需要在catch处重新interrupt
6.关于TreeMap,put操作时增加了类锁,保证安全性问题。适用于多消费者.
已知hashMap在多线程中会导致死循环,占用超高cpu
7.关于消费者退出方法exit(),此处必须使用this.interrupt();
区别于 Thread.currentThread().interrupt(); 由于exit()在main方法中被调用,
中断的是主线程
存储数据的类
package threads.productConsumer;
import com.sun.deploy.util.BlackList;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @description:
* @author: HYW
* @create: 2020-01-21 11:30
*/
public class DataWare {
static LinkedBlockingQueue<DataItem> sourceQueue=new LinkedBlockingQueue<DataItem>();
static TreeMap<String,Float> sortMap=new TreeMap<String,Float>();
public static LinkedBlockingQueue<DataItem> getSourceQueue() {
return sourceQueue;
}
public static void setSourceQueue(LinkedBlockingQueue<DataItem> sourceQueue) {
DataWare.sourceQueue = sourceQueue;
}
public static TreeMap<String, Float> getSortMap() {
return sortMap;
}
public static void setSortMap(TreeMap<String, Float> sortMap) {
DataWare.sortMap = sortMap;
}
}
java Bean
package threads.productConsumer;
/**
* @description:
* @author: HYW
* @create: 2020-01-21 11:28
*/
public class DataItem {
/**
* id
*/
private String id;
/**
* 分组
*/
private String groupId;
/**
* 指标
*/
private Float quota;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public Float getQuota() {
return quota;
}
public void setQuota(Float quota) {
this.quota = quota;
}
@Override
public String toString() {
return "DataItem{" +
"id='" + id + '\'' +
", groupId='" + groupId + '\'' +
", quota=" + quota +
'}';
}
}
工作线程管理者
package threads.productConsumer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @description:
* @author: HYW
* @create: 2020-01-21 11:29
*/
public class Producter extends Thread{
String path;
int threadsNum;
ExecutorService threadPool;
Producter(String path,int threadsNum){
this.path=path;
this.threadsNum=threadsNum;
threadPool= Executors.newFixedThreadPool(threadsNum,
new DefaultThreadFactory("product"));;
}
@Override
public void run() {
//获取文件目录
File[] fileList=getCataLog();
if(fileList==null ||fileList.length==0){
System.out.println("无文件异常");
return;
}
CountDownLatch countDownLatch=new CountDownLatch(fileList.length);
//遍历文件
for(File file:fileList){
FileWorker fileWorker=new FileWorker();
fileWorker.setFileName(file.getAbsolutePath());
fileWorker.setCountDownLatch(countDownLatch);
threadPool.execute(fileWorker);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
}
private File[] getCataLog() {
File file=new File(path);
return file.listFiles();
}
public static void main(String[] args) {
Producter producter=new Producter("E:\\log",10);
producter.start();
}
}
工作线程
package threads.productConsumer;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @description:
* @author: HYW
* @create: 2020-01-21 14:07
*/
public class FileWorker implements Runnable{
private LinkedBlockingQueue<DataItem> sourceQueue=DataWare.getSourceQueue();
private String fileName;
private CountDownLatch countDownLatch;
public void setFileName(String fileName) {
this.fileName = fileName;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<DataItem> list=getData();
list.forEach(dataItem -> sourceQueue.offer(dataItem));
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName()+"执行完毕");
}
@Test
public List<DataItem> getData() {
List<DataItem> reslist=new ArrayList<>();
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(fileName));
String line=null;
while((line=br.readLine())!=null) {
try {
String[] arrs=line.split(",",-1);
DataItem dataItem=new DataItem();
dataItem.setId(arrs[0]);
dataItem.setGroupId(arrs[1]);
dataItem.setQuota(Float.parseFloat(arrs[2]));
reslist.add(dataItem);
}catch (Exception e){
System.out.println("数据解析异常");
e.printStackTrace();
}
}
} catch (IOException e){
e.printStackTrace();
} finally {
try {
if(br!=null){
br.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
reslist.forEach(x-> System.out.println(x.toString()));
return reslist;
}
}
消费者
package threads.productConsumer;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @description:
* @author: HYW
* @create: 2020-01-21 11:29
*/
public class Consummer extends Thread{
LinkedBlockingQueue<DataItem> sourceQueue=DataWare.getSourceQueue();
static TreeMap<String,Float> sortMap=DataWare.getSortMap();
@Override
public void run() {
super.run();
while (!isInterrupted()){
try {
DataItem dataItem=sourceQueue.take();
//map为空
if(sortMap.get(dataItem.getGroupId())==null){
put(dataItem.getGroupId(),dataItem.getQuota());
}else {
float tempMin=sortMap.get(dataItem.getGroupId());
if(tempMin >dataItem.getQuota()){
put(dataItem.getGroupId(),dataItem.getQuota());
}
}
} catch (InterruptedException e) {
System.out.println("中断状态"+Thread.currentThread().isInterrupted());
//把中断状态 由true 改为false
System.out.println("中断状态"+Thread.interrupted());
//外出循环终止
Thread.currentThread().interrupt();
}
}
}
static synchronized void put(String groupId,float quota){
sortMap.put(groupId,quota);
}
void exit(Thread consumer){
//等待消费结束
while (DataWare.getSourceQueue().size()>0){
System.out.println("剩余任务数量"+DataWare.getSourceQueue().size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.interrupt();
}
}
Demo
package threads.productConsumer;
/**
* @description:
* @author: HYW
* @create: 2020-01-21 14:49
*/
public class Demo {
public static void main(String[] args) {
Producter producter=new Producter("E:\\log",10);
producter.start();
//开始消费
System.out.println("开始消费");
Consummer consummer=new Consummer();
consummer.start();
try {
producter.join();
//中断,消费结束
consummer.exit(consummer);
System.out.println("生产完成;消费结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印结果
DataWare.getSortMap().forEach((key,val)-> System.out.println(key+":"+val));
}
}
输出结果
开始消费
DataItem{id=‘2000102’, groupId=‘100’, quota=91.32}
DataItem{id=‘2000101’, groupId=‘100’, quota=66.32}
DataItem{id=‘2000103’, groupId=‘100’, quota=101.32}
DataItem{id=‘2000105’, groupId=‘100’, quota=98.32}
DataItem{id=‘2000106’, groupId=‘100’, quota=88.32}
DataItem{id=‘3000102’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000102’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000101’, groupId=‘101’, quota=93.32}
DataItem{id=‘3000101’, groupId=‘101’, quota=93.32}
DataItem{id=‘3000103’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000105’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000103’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000106’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000105’, groupId=‘101’, quota=98.32}
DataItem{id=‘3000106’, groupId=‘101’, quota=98.32}
product-1-2执行完毕
消费成功DataItem{id=‘2000102’, groupId=‘100’, quota=91.32}
product-1-3执行完毕
product-1-1执行完毕
生产已经结束,消费者完成消费后退出
剩余任务数量14
消费成功DataItem{id=‘3000102’, groupId=‘101’, quota=98.32}
消费成功DataItem{id=‘3000101’, groupId=‘101’, quota=93.32}
消费成功DataItem{id=‘3000103’, groupId=‘101’, quota=98.32}
消费成功DataItem{id=‘3000105’, groupId=‘101’, quota=98.32}
剩余任务数量10
消费成功DataItem{id=‘3000106’, groupId=‘101’, quota=98.32}
消费成功DataItem{id=‘3000102’, groupId=‘101’, quota=98.32}
消费成功DataItem{id=‘3000101’, groupId=‘101’, quota=93.32}
消费成功DataItem{id=‘3000103’, groupId=‘101’, quota=98.32}
消费成功DataItem{id=‘3000105’, groupId=‘101’, quota=98.32}
剩余任务数量5
消费成功DataItem{id=‘3000106’, groupId=‘101’, quota=98.32}
消费成功DataItem{id=‘2000101’, groupId=‘100’, quota=66.32}
消费成功DataItem{id=‘2000103’, groupId=‘100’, quota=101.32}
消费成功DataItem{id=‘2000105’, groupId=‘100’, quota=98.32}
消费成功DataItem{id=‘2000106’, groupId=‘100’, quota=88.32}
生产完成;消费结束
100:66.32
101:93.32