由于HBase不能像hadoop一样通过以下参数调整split大小,而实现多个mapper读取
mapred.min.split.size mapred.max.split.size
所以目前想到的方法有两种,一是修改TableInputFormatBase,把默认的一个TableSplit分拆成多个,另外一种方法是,通过Coprocessor处理。这里选择修改TableInputFormatBase类。
HBase权威指南里面有介绍怎么把HBase与MR结合,通过需要用到一下的辅助类实现把HBase表作为数据来源,读取数据:
TableMapReduceUtil.initTableMapperJob(table[0].getBytes(), scan, UserViewHisMapper2.class, Text.class, Text.class, genRecommendations);
而这个方法,最终是调用以下方法进行初始化设置的:
public static void initTableMapperJob(byte[] table, Scan scan, Class<? extends TableMapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job, boolean addDependencyJars) throws IOException { initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, addDependencyJars, TableInputFormat.class); }
所以,思路就应该修改TableInputFormat这个类。而这个类的核心方法是继承了TableInputFormatBase:
public class TableInputFormat extends TableInputFormatBase implements Configurable
最终要修改的则是TableInputFormatBase这个类,修改其以下方法:
public List<InputSplit> getSplits(JobContext context) throws IOException {}
这个方法的核心是,获得table对应所有region的起始row,把每个region作为一个tableSplit:
public List<InputSplit> getSplits(JobContext context) throws IOException { if (table == null) { throw new IOException("No table was provided."); } Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } int count = 0; List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } String regionLocation = table.getRegionLocation(keys.getFirst()[i]). getHostname(); byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); // determine if the given start an stop key fall into the region if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys.getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; InputSplit split = new TableSplit(table.getTableName(), splitStart, splitStop, regionLocation); splits.add(split); if (LOG.isDebugEnabled()) LOG.debug("getSplits: split -> " + (count++) + " -> " + split); } } return splits; }
这里要做的就是,把本来属于一个tableSplit的row在细分,分成自己希望的多个小split。但没有找到轻巧的实现,唯有不断迭代,把一个tableSplit的row全部取出,再拆分了,有点蛮力。
以下是我的实现方法:
public List<InputSplit> getSplits(JobContext context) throws IOException { if (table == null) { throw new IOException("No table was provided."); } Pair<byte[][], byte[][]> keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } int count = 0; List<InputSplit> splits = new ArrayList<InputSplit>( keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } String regionLocation = table.getRegionLocation(keys.getFirst()[i],true) .getHostname(); byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); // determine if the given start an stop key fall into the region if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes .compareTo(startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { byte[] splitStart = startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? keys .getFirst()[i] : startRow; byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo( keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0 ? keys.getSecond()[i] : stopRow; Scan scan1 = new Scan(); scan1.setStartRow(splitStart); scan1.setStopRow(splitStop); scan1.setFilter(new KeyOnlyFilter()); scan1.setBatch(500); ResultScanner resultscanner = table.getScanner(scan1); //用来保存该region的所有key List<String> rows = new ArrayList<String>(); //Iterator<Result> it = resultscanner.iterator(); for(Result rs : resultscanner) { if(rs.isEmpty()) continue; rows.add(new String(rs.getRow())); } int splitSize = rows.size() / mappersPerSplit; for (int j = 0; j < mappersPerSplit; j++) { TableSplit tablesplit = null; if (j == mappersPerSplit - 1) tablesplit = new TableSplit(table.getTableName(), rows.get(j * splitSize).getBytes(), rows.get(rows.size() - 1).getBytes(), regionLocation); else tablesplit = new TableSplit(table.getTableName(), rows.get(j * splitSize).getBytes(), rows.get(j * splitSize + splitSize).getBytes(), regionLocation); splits.add(tablesplit); if (LOG.isDebugEnabled()) LOG.debug((new StringBuilder()) .append("getSplits: split -> ").append(i++) .append(" -> ").append(tablesplit).toString()); } resultscanner.close(); } } return splits; }
通过配置设置需要拆分的split数。