



  下面是部分数据,数据格式:编号 联系人 电话 时间。


  首先,输入文件是Excel格式,我们可以借助poi jar包来解析Excel文件,如果本地没有可以下载:poi-3.9.jar 和 poi-excelant-3.9.jar 并引入到项目中。借助这两个jar包,我们先来实现一个Excel的解析类 —— ExcelParser.java

package com.hadoop.phoneStatistics;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;

 * @author 努力的凹凸曼
 * 用于解析Excel中的通话记录
public class ExcelParser {
    private static final Log LOG = LogFactory.getLog(ExcelParser.class);
    private StringBuilder currentString = null;
    private long bytesRead = 0;
    public String parseExcelData(InputStream is) {
        try {
            HSSFWorkbook workbook = new HSSFWorkbook();
            // Taking first sheet from the workbook
            HSSFSheet sheet = workbook.getSheetAt(0);
            // Iterate through each rows from first sheet
            Iterator<Row> rowIterator = sheet.iterator();
            currentString = new StringBuilder();
            while (rowIterator.hasNext()) {
                Row row = rowIterator.next();
                // For each row, iterate through each columns
                Iterator<Cell> cellIterator = row.cellIterator();
                while (cellIterator.hasNext()) {
                    Cell cell = cellIterator.next();
                    switch (cell.getCellType()) {
                    case Cell.CELL_TYPE_BOOLEAN:
                        currentString.append(cell.getBooleanCellValue() + "\t");
                    case Cell.CELL_TYPE_NUMERIC:
                        currentString.append(cell.getNumericCellValue() + "\t");
                    case Cell.CELL_TYPE_STRING:
                        currentString.append(cell.getStringCellValue() + "\t");
        } catch (IOException ioe) {
            // TODO: handle exception
            LOG.error("IO Exception : File not found " + ioe);
        return currentString.toString();

    public long getBytesRead() 
        return bytesRead;



package com.hadoop.phoneStatistics;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * @author 努力的凹凸曼
 * 处理通话记录
public class ExcelPhoneStatistics extends Configured implements Tool {
    private static Logger logger = LoggerFactory.getLogger(ExcelPhoneStatistics.class);

    public static class ExcelMapper extends
            Mapper<LongWritable, Text, Text, Text> 

        private static Logger LOG = LoggerFactory.getLogger(ExcelMapper.class);
        private Text pkey = new Text();
        private Text pvalue = new Text();

         * Excel Spreadsheet is supplied in string form to the mapper. We are
         * simply emitting them for viewing on HDFS.
        public void map(LongWritable key, Text value, Context context)
                throws InterruptedException, IOException
            //1.0, 老爸, 13999123786, 2014-12-20
            String line = value.toString();
            String[] records = line.split("\\s+");
            String[] months = records[3].split("-");//获取月份
            pkey.set(records[1] + "\t" + months[1]);//昵称+月份
            context.write(pkey, pvalue);
            LOG.info("Map processing finished");

    public static class PhoneReducer extends Reducer<Text, Text, Text, Text>
        private Text pvalue = new Text();

        protected void reduce(Text Key, Iterable<Text> Values, Context context)
                throws IOException, InterruptedException 
            int sum = 0;
            Text outKey = Values.iterator().next();
            for (Text value : Values)
            context.write(Key, pvalue);

    public static class PhoneOutputFormat extends
            MailMultipleOutputFormat<Text, Text> 

        protected String generateFileNameForKeyValue(Text key,
                Text value, Configuration conf)
            String[] records = key.toString().split("\t"); 
            return records[1] + ".txt";


    public int run(String[] args) throws Exception 
        Configuration conf = new Configuration();// 配置文件对象
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
        if (hdfs.isDirectory(mypath))
            hdfs.delete(mypath, true);
        logger.info("Driver started");

        Job job = new Job();
        job.setJobName("Excel Record Reader");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return 0;

    public static void main(String[] args) throws Exception 
        String[] args0 = { 
//            args[0], args[1]
        int ec = ToolRunner.run(new Configuration(), new ExcelPhoneStatistics(), args0);


package com.hadoop.phoneStatistics;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 * @author 努力的凹凸曼
 * 自定义输入格式
 * * <p>
 * An {@link org.apache.hadoop.mapreduce.InputFormat} for excel spread sheet files.
 * Multiple sheets are supported
 * <p/>
 * Keys are the position in the file, and values are the row containing all columns for the
 * particular row.

public class ExcelInputFormat extends FileInputFormat<LongWritable, Text> {

    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return new ExcelRecordReader();

    public class ExcelRecordReader extends RecordReader<LongWritable, Text> {

        private LongWritable key;
        private Text value;
        private InputStream is;
        private String[] strArrayofLines;

        public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            final Path file = split.getPath();
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(file);
            is = fileIn;
            String line =     new ExcelParser().parseExcelData(is);//调用解析excel方法
            this.strArrayofLines = line.split("\n");

        public boolean nextKeyValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            if (key == null) {
                key = new LongWritable(0);
                value = new Text(strArrayofLines[0]);
            } else {
                if (key.get() < this.strArrayofLines.length - 1) {
                    long pos = (int)key.get();
                    key.set(pos + 1);
                    value.set(this.strArrayofLines[(int)(pos + 1)]);
                } else {
                    return false;
            if (key == null || value == null) {
                return false;
            } else {
                return true;

        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return key;

        public Text getCurrentValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return value;

        public float getProgress() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return 0;

        public void close() throws IOException {
            // TODO Auto-generated method stub
            if (is != null) {


package com.hadoop.phoneStatistics;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

import com.jcraft.jsch.Compression;

 * @author 努力的凹凸曼
 * @param <MultiRecordWriter>
 * 自定义输出格式
public abstract class MailMultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
            extends FileOutputFormat<K, V>{

    private MultiRecordWriter writer = null;  
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException {
        if (writer == null) {
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));
        return writer;
    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) {
            workPath = ((FileOutputCommitter) committer).getWorkPath();
        } else {
            Path outputPath = super.getOutputPath(conf);
            if (outputPath == null) {
                throw new IOException("Undefined job output-path");  
            workPath = outputPath;
        return workPath;
    //通过key, value, conf来确定输出文件名(含扩展名) 
    protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
    public class MultiRecordWriter extends RecordWriter<K, V> {
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;
        private TaskAttemptContext job = null;
        private Path workPath = null;
        public MultiRecordWriter(TaskAttemptContext job, Path workpath) {
            // TODO Auto-generated constructor stub
            this.job = job;
            this.workPath = workpath;
            recordWriters = new HashMap<String, RecordWriter<K, V>>();
        public void write(K key, V value) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);
            if (rw == null) {
                rw = getBaseRecordWriter(job, baseName);
                this.recordWriters.put(baseName, rw);
            rw.write(key, value);

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
            while (values.hasNext()) {
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
                throws IOException {
            Configuration conf = job.getConfiguration();
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = "\t";//key value 分隔符
            RecordWriter<K, V> recordWriter = null;
            if (isCompressed) {
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new MailRecordWriter<K, V>(
                        new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
            } else {
                Path file = new Path(workPath, baseName);
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new MailRecordWriter<K, V>(fileOut, keyValueSeparator);
            return recordWriter;


package com.hadoop.phoneStatistics;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

 * @author 努力的凹凸曼
public class MailRecordWriter< K, V > extends RecordWriter< K, V > 
    private static final String utf8 = "UTF-8";  
    private static final byte[] newline;  
            newline = "\n".getBytes(utf8);
        } catch (UnsupportedEncodingException uee)
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
    protected DataOutputStream out;  
    private final byte[] keyValueSeparator;  
    public MailRecordWriter(DataOutputStream out, String keyValueSeparator)
        this.out = out;  
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
        } catch (UnsupportedEncodingException uee)
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
    public MailRecordWriter(DataOutputStream out) 
        this(out, "/t");  
    private void writeObject(Object o) throws IOException 
        if (o instanceof Text)
            Text to = (Text) o;  
            out.write(to.getBytes(), 0, to.getLength());  
        } else 
    public synchronized void write(K key, V value) throws IOException 
        boolean nullKey = key == null || key instanceof NullWritable;  
        boolean nullValue = value == null || value instanceof NullWritable;  
        if (nullKey && nullValue)
        if (!nullKey) 
        if (!(nullKey || nullValue)) 
        if (!nullValue) 
    public synchronized void close(TaskAttemptContext context) throws IOException 






