package word_Demo1;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
public class Test1 {
/*主线程,main方法,线程的开启/
public static void main(String[] args) throws Exception {
//1.开启3个线程读取文件的不同位置的数据
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
try {
mapTask(0,"E:\\java增强\\day06\\day06\\words.txt","E:\\java增强\\day06\\day06\\res.txt");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
try {
mapTask(8*1024,"E:\\java增强\\day06\\day06\\words.txt","E:\\java增强\\day06\\day06\\res.txt");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread t3=new Thread(new Runnable() {
@Override
public void run() {
try {
mapTask(16*1024,"E:\\java增强\\day06\\day06\\words.txt","E:\\java增强\\day06\\day06\\res.txt");
} catch (Exception e) {
e.printStackTrace();
}
}
});
```
//2.开启线程
t1.start();
t1.join();
t2.start();
t2.join();
t3.start();
t3.join();
```
//3.开启汇总的reduce任务
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
try {
reduceTask("E:\\java增强\\day06\\day06\\res.txt");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t4.start();
}
/*对原文件进行操作/
public static void mapTask(long startIndex,String path1,String path2) throws Exception {
Map<String,Integer>map=new HashMap<>();
```
//1.读取指定文件 指定文件的位置
/*FileInputStream 用于读取本地文件中的字节数据,继承自InputStream类*/
FileInputStream fin = new FileInputStream(new File(path1));
/*转化流:InputStreamReader/OutputStreamWriter,把字节转化成字符*/
BufferedReader br = new BufferedReader(new InputStreamReader(fin));
```
//2.判断是不是第一个任务,如果不是就跳过第一行
String str=null;
if(startIndex>0) {
fin.skip(startIndex);
str=br.readLine();
System.out.println(str);
}
//3.记录读取的长度
String line=null;
long len=0L;/*字节数*/
int count=0;
while((line=br.readLine())!=null) {
len+=line.getBytes().length+2; /*换行符\n站2个字符*/
/*
* String.length()返回字符串的字符个数,一个中文算一个字符;
* String.getBytes().length返回字符串的字节长度,一个中文两个字节;
*/
//4.切割,获取字符 统计单词个数
String[] words = line.split("\\s");
for (String word : words) {
Integer num = map.getOrDefault(word, 0);
num++;
map.put(word, num);
}
```
//5.跳出当前循环
if(str!=null) {
if(len>1024*8-str.getBytes().length-2) {
/*减去相应的定位的长度*/
break;
}
}else {
if(len>1024*8) {/*减去相应的定位的长度*/
break;
}
}
}
```
//6.将统计的结果写到中间文件中,多个线程操作不能覆盖
BufferedWriter bw = new BufferedWriter(
new OutputStreamWriter(
new FileOutputStream(
new File(path2),true)));
Set<Entry<String, Integer>> entrySet = map.entrySet();
for (Entry<String, Integer> entry : entrySet) {
bw.write(entry.toString());
bw.newLine();
}
bw.flush();
bw.close();
fin.close();
br.close();
}
```
/**对中间文件进行操作*/
public static void reduceTask(String path) throws Exception {
Map<String,Integer>map=new HashMap<>();
//1.读取中间结果数据
BufferedReader br = new BufferedReader(new FileReader(path));
String line=null;
while((line=br.readLine())!=null) {
```
//2.处理每行的数据,统计单词的个数
String[] split = line.split("=");
Integer count = map.getOrDefault(split[0], 0);
count+=Integer.parseInt(split[1]);
map.put(split[0], count);
}
```
//3.遍历加排序
Set<Entry<String, Integer>> entrySet = map.entrySet();
for (Entry<String, Integer> entry : entrySet) {
System.out.println(entry);
}
br.close();
}
}
“`