更多IT互联网学习资源,尽在“通通学 - 知识学习与分享平台”
学习过Hadoop的都知道中,里面有一个经典实例就是统计文档每个单词出现的次数,即WordCount实例。这里利用Executor框架及带返回值的多线程实现Word?Count实例。
以下是核心代码:
WordCountMapper.java
package com.tongtongxue.wordcount; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.Callable; public class WordCountMapper implements Callable<Map> { private int start; private int end; private File[] files; public WordCountMapper() { } public WordCountMapper(File[] files, int start, int end) { this.files = files; this.start = start; this.end = end; } @Override public Map call() throws Exception { BufferedReader reader = null; Map result = new HashMap(); String line = null; for (int i = start; i < end; i++) { File file = files[i]; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8")); while ((line = reader.readLine()) != null) { StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { String word = tokenizer.nextToken(); if (result.containsKey(word)) { result.put(word, result.get(word) + 1L); } else { result.put(word, 1L); } } } } finally { if (reader != null) { reader.close(); } } } return result; } }
WordCount.java
package com.tongtongxue.wordcount; import java.io.File; import java.io.FileFilter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class WordCount { private ExecutorService executorService; private int threadNum; private List<Future<Map>> tasks = new ArrayList<Future<Map>>(); private File[] txtFiles; public WordCount() { // 以cup的个数,作为线程个数 threadNum = Runtime.getRuntime().availableProcessors(); executorService = Executors.newFixedThreadPool(threadNum); } public WordCount(int threadNum) { this.threadNum = threadNum; executorService = Executors.newFixedThreadPool(threadNum); } public void count(String dirPath) throws Exception { File dir = new File(dirPath); txtFiles = dir.listFiles(new FileFilter() { @Override public boolean accept(File file) { String fileName = file.getName(); if (fileName.endsWith(".txt") || fileName.endsWith(".TXT")) { return true; } return false; } }); int size = txtFiles.length; for (int i = 0; i size) { end = size; } WordCountMapper mapper = new WordCountMapper(txtFiles, start, end); FutureTask<Map> futureTask = new FutureTask<Map>(mapper); tasks.add(futureTask); if (!executorService.isShutdown()) { executorService.submit(futureTask); } } showResult(); } public void close() { executorService.shutdown(); } public void showResult() throws Exception { Map map = new HashMap(); for (Future<Map> task : tasks) { Map result = task.get(); for (Entry entry : result.entrySet()) { String word = entry.getKey(); Long num = entry.getValue(); if (map.containsKey(word)) { map.put(word, map.get(word) + num); } else { map.put(word, num); } } } System.out.println(map.size()); for (Entry entry : map.entrySet()) { System.out.println(entry.getKey() + " ------> " + entry.getValue()); } } }