基于Hadoop平台的海量图像处理接口设计

本文是基于Hadoop平台的海量图像处理接口设计,主要实现SequenceFile文件预处理、图像灰度化处理、多文件输出。在Linux系统下,采用Java语言,命令行输入,SSH免密登陆,HDFS存储数据,Mapreduce分发处理,设计和实现一个高效、稳定、快速的基于Hadoop平台的海量图像处理接口。其中SequenceFileTest类用于图片预处理,将图片文件转为SequenceFile文件。ImageMapper实现map函数,分发处理输入的SequenceFile文件,同时将之灰度化处理。MyMultipleOutputFormat重写MultipleOutputFormat,用于实现多文件输出,LineRecordWriter继承RecordWriter,自定义输出类型。Test用来驱动mapreduce函数,实现在hadoop集群上运行整个程序。

代码实现主要分为SequenceFileTest.java、ImageMapper.java、MyMultipleOutputFormat.java、LineRecordWriter.java和Test.java。SequenceFileTest.java用于图片预处理,将图片文件转为顺序文件,作为MapReduce的输入数据。ImageMapper.java为Mapper函数,用于分片并灰度化处理图片。MyMultipleOutputFormat.java和LineRecordWriter.java用于实现自定义多文件输出。Test.java为驱动程序。

1、SequenceFileTest.java

//将小图像文件合并为SequenceFile
import java.io.IOException;  
import java.net.URI;  
import org.apache.hadoop.io.*;  
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.*; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.*; 
public class SequenceFileTest 
{  
static SequenceFile.Writer writer = null;  
static int index = 0; 
public static void main(String[] args) throws Exception
 {  
     Configuration conf = new Configuration();  
     conf.set("fs.defaultFs", "file:///");  //P159
     conf.set("mapred.job.tracker", "local");  
     String path = args[0];  
     URI uri = new URI(path);  
FileSystem fileSystem = FileSystem.get(uri, conf);  
     int i = 1;
     while(true)
     {
	   System.out.println("/seq/d"+i);//提示已经处理多少个sequenceFile文件
       //实例化writer对象
       writer = SequenceFile.createWriter(fileSystem, conf, new Path("/seq/d"+i), Text.class, BytesWritable.class);     
       i++;//标识已处理图片数目
       if(!listFileAndWriteToSequenceFile(fileSystem,path))
	   {
	     org.apache.hadoop.io.IOUtils.closeStream(writer);
		 break;  
	   }
       org.apache.hadoop.io.IOUtils.closeStream(writer);
     }  
 }
  //递归遍历文件夹,并将文件下的文件写入sequenceFile文件
   public static boolean listFileAndWriteToSequenceFile(FileSystem fileSystem,String path) throws Exception
  {  
    final FileStatus[] listStatuses = fileSystem.listStatus(new Path(path));   
    long len = 0;//len用于记录当前sequenceFile文件大小
    int currIndex = 0;//标识当前图片的索引
    for (FileStatus fileStatus : listStatuses)
   {  
      if(fileStatus.isFile())
      {  
         if(currIndex<index) //如果当前图片已处理,则进行下一张图片
	     {
            currIndex++;
            continue;
	     }
         Text fileText = new Text(fileStatus.getPath().toString());  
         index++;
FSDataInputStream in = fileSystem.open(new Path(fileText.toString()));  
         byte[] buffer = IOUtils.toByteArray(in);  
         in.read(buffer);  
         len += buffer.length;
         //将文件内容转为BytesWritable形式并存入value
         BytesWritable value = new BytesWritable(buffer);  
         String k = fileText.toString();
         k = k.substring(k.lastIndexOf("/")+1); //从文件路径获取文件名
         Text key = new Text(k);
         //以文件名为key,以文件内容为value写入sequenceFile
         writer.append(key, value); 
          //如果当前sequenceFile超过10M,则进行下一个sequenceFile 
         if(len>10*1024*1024)
		      return true; 
      }  
      if(fileStatus.isDirectory())  
      {  
    listFileAndWriteToSequenceFile(fileSystem,fileStatus.getPath().toString());   
      }  
   }  
  return false;
 }
}  

首先通过SequenceFile.createWriter来实例化一个SequenceFile,然后输入待写入的数据流FileSystem对象,Configuration对象,文件名,以及键和值的类型。其中,数据流通过FileSystem打开文件进行数据读取。FileSystem是Hadoop的一个通用的文件系统,open()方法返回的是FSDataInputStream对象,这个类继承了java.io.DataInputStream的一个特殊类,并支持随机访问,可以从流的任意位置读取数据,而我们需要的是字节数组,所以我们用toBitesArray()将之转化为字节数组存储在buffer中。与Java不同,Hadoop用Path对象代表文件,而非File。系统通过命令行输入args[1]设定输出路径,由conf传递任务配置。由于本论文,我们以文本形式保存键,以二进制字符数组保存值,所以这里参数设Text.class、BytesWritable.class。

在由FileSystem读取待处理文件时,由listFileAndWriteToSequenceFile()来循环读取目录底下的文件。FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、复本、最后一次修改时间等。FileSystem的ListStatus()方法能够列出目录中的内容。若参数是一个文件时,将以数组方式返回一个长度为1的FileStatus对象。若参数是一个目录时,则返回0或多个FileStatus对象,该对象列出所有子目录和文件。本文通过for循环遍历整个文件夹下的文件,如果键值对读取成功,则返回true,并且继续遍历,如果已读到文件末尾,则返回false。由于本文我们将文件名作为key,文件内容作为value,但是FileStatus获取的是整个路径,所以我们需要使用substring来获取文件名。再通过append()方法在SequenceFile文件末尾附件键/值对。其中,len用于记录每个SequenceFile的大小,index用于记录已处理的图片总数,currentIndex用于记录目前读取的图片序号。若currentIndex小于index,表示该图片已经处理过,跳过,读取下一幅。本论文设定每个SequenceFile大小为10M左右,if(len>10*1024*1024)用于判断当前SequenceFile读取数据量,若超过10M,则开始下一个SequenceFile文件。

若文件遍历结束,则用IOUtils.closeStream关闭数据流,再退出while循环;否则,进行下一个文件的读取。

2、ImageMapper.java

import java.io.IOException; 
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapreduce.Mapper;  
import java.awt.image.BufferedImage;
import java.io.*;  
import java.io.IOException;
import java.sql.Time;
import java.util.Timer;
import javax.imageio.ImageIO;  
public class ImageMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{  
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException
    {  
        //获取文件名后缀
        String last = key.toString();
        last = last.substring(last.lastIndexOf(".")+1);  
//反序列化,从data流中读入字节,按byte格式读出到in中
byte[] data = value.getBytes();
        ByteArrayInputStream in = new ByteArrayInputStream(data);
//调用hd()方法将图片灰度化处理
BufferedImage img = hd(ImageIO.read(in));
//序列化灰度化后的图像,将图像按byte传输格式写入到out流中
	ByteArrayOutputStream out = new ByteArrayOutputStream();
        ImageIO.write(img,last,out);
data = out.toByteArray();
        BytesWritable v = new BytesWritable();
        v.set(data,0,data.length);
        context.write(key,v);  
}     
   private int colorToRGB(int alpha, int red, int green, int blue) 
   {    int newPixel = 0;
        newPixel += alpha;
        newPixel = newPixel << 8;
        newPixel += red;
        newPixel = newPixel << 8;
        newPixel += green;
        newPixel = newPixel << 8;
        newPixel += blue;
        return newPixel;
    }    
    private BufferedImage hd(BufferedImage bufferedImage) throws IOException
    {
        //获取到图片的属性信息
        BufferedImage grayImage = 
            new BufferedImage(bufferedImage.getWidth(), 
                          bufferedImage.getHeight(), 
                          bufferedImage.getType());
        //图片的宽高
        for (int i = 0; i < bufferedImage.getWidth(); i++) 
        {
            for (int j = 0; j < bufferedImage.getHeight(); j++) 
            {
                final int color = bufferedImage.getRGB(i, j);
                final int r = (color >> 16) & 0xff;     //右移4位
                final int g = (color >> 8) & 0xff;      //右移3位
                final int b = color & 0xff;
                //运用灰度处理的方法加权平均值
                int gray = (int) (0.3 * r + 0.59 * g + 0.11 * b);
                //每一个像素点的灰度转化
                int newPixel = colorToRGB(255, gray, gray, gray);
                //将rgb值写回图片
                grayImage.setRGB(i, j, newPixel);
            }
        }
        return grayImage;
    }
}  

MapReduce分为mapreduce两个过程。每个阶段都以键值对作为输入和输出,本论文中不需要归并处理,所以不需要reduce()函数,这里,我们只要写一个map()函数即可map函数由Mapper类来表示,参数分别指map函数的输入值键、输入值、输出键、输出值。map阶段的输入是SequenceFile文件,而在之前的SequenceFile预处理文件时,我们就是以Text为键类,BytesWritable为值类型写入的,所以这里map函数的类型与上文一致Context实例用于输出内容的写入。

由于后面输入键/值对时,需要用到文件名,而我们之前获取的是文件路径,包括图片文件所属的目录,这里我们用last.substring来截取最后一个/”后所含的图片文件名称。

因输入的是SequenceFile文件,其value是数据流,我们先将之转化为二进制文件存储到data这个字节数组中,再申请一个ByteArrayInputStream字节数组缓冲区,将data缓存到内存中,然后用BufferedImage将数据反序列化为有结构的图片,用于后面的灰度化处理。

hd()方法用来对图片进行灰度化处理。

灰度化处理后,又需要将图片反序列化,用于传输和存储。该过程中,我们先申请一个字节数组缓冲区,将处理后的图片写入内存out中,再将其转化为二进制数据,以BytesWritable类型写于v。BytesWritable是对二进制数据组的封装。BytesWritable是可变的,其值可以根据set()方法进行修改。

BytesWritablev = new BytesWritable();

 v.set(data,0,data.length);

为了检查BytesWritable的序列化形式,我们在java.io.DataOutputSteam中加入一个帮助函数来封装java.io.ByteArrayOutputSteam,以便在序列化中捕捉字节

最后我们将key和处理后的value写回writer

3、MyMultipleOutputFormat.java

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyMultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>  extends FileOutputFormat<K, V>
{
   //定义MultiRecordWriter
private MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,  InterruptedException
    {
      if (writer == null) 
{ writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
	return writer;
 }
	
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException 
 {
	 Path workPath = null;
	OutputCommitter committer = super.getOutputCommitter(conf);
	   if (committer instanceof FileOutputCommitter)         
           {workPath = ((FileOutputCommitter) committer).getWorkPath();
		}         
      else 
       {
		 Path outputPath = super.getOutputPath(conf);
		 if (outputPath == null) 
          {
throw new IOException("Undefined job output-path");
		}
		 workPath = outputPath;
	    }
	return workPath;
	 }
//通过key, value, conf来确定输出文件名
	protected String generateFileNameForKeyValue(K key, V value, Configuration conf)
 {    return key.toString();  }  
//实现记录写入器RecordWriter类
	public class MultiRecordWriter extends RecordWriter<K, V>
    {
	private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
	//输出目录
	  private Path workPath = null;
	  public MultiRecordWriter(TaskAttemptContext job, Path workPath) 
      {
	    super();
		this.job = job;
		this.workPath = workPath;
		recordWriters = new HashMap<String, RecordWriter<K, V>>();
	   }
	public void close(TaskAttemptContext context) throws IOException, InterruptedException 
 {
	Iterator<RecordWriter<K, V>> values =this.recordWriters.values().iterator();
		while (values.hasNext()) 
         {
			  values.next().close(context);
		}
		this.recordWriters.clear();
		}
	public void write(K key, V value) throws IOException, InterruptedException 
   {
	//得到输出文件名
	String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
   //如果recordwriter里没有文件名,那么就建立,否则就直接写值
	RecordWriter<K, V> rw = this.recordWriters.get(baseName);
	if (rw == null) 
    {
		rw = getBaseRecordWriter(job, baseName);
		this.recordWriters.put(baseName, rw);
}
	    rw.write(key, value);
	}
		
   //自动调用来读数据并写往文件系统。
    Configuration conf = job.getConfiguration();
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)throws IOException, InterruptedException 
{
//根据Conf判定是否需要压缩,若需要压缩获取压缩格式及后缀;
	Configuration conf = job.getConfiguration();
   boolean isCompressed = getCompressOutput(job);
  //设置逗号为键/值对分割符
	String keyValueSeparator = ",";
	RecordWriter<K, V> recordWriter = null;
//获取需要生成的文件路径
	Path file = new Path(workPath, baseName);
//根据文件生成FSDataOutputStream对象,并返回recordWriter
	FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
   //使用自定义的OutputFormat
	recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
	return recordWriter;
	}
}
}

MyMultipleOutputForma主要参考MultipleOutputFormat,用来实现自定义多文件输出格式,即将原来的图片一幅一幅输出。它包括四个子类,分别是RecordWriter(),getTaskOutputPath(),generateFileNameForKeyValue(),MultiRecordWriter()。上文中TaskAttemptContext封装了task的上下文.

getTaskOutputPath()用来获取任务的输出路径,这个行为通过提交协议OutputCommitter的getOutputCommitter()方法来获取。若返回结果是系统默认的FileOutputCommitter,则直接输出工作路径,否则,则将命令行设置的输出路径赋值给workPath.。

generateFileNameForKeyValue()用来获取文件名用于下文的writer()。

MultiRecordWriter ()实现自定义的RecordWriter。这里依旧通过LineRecordWriter来实现的。MultiRecordWriter对LineRecordWriter进行了封装,对于同一个task在输出的时候进行了拆分。运行MapReduce时,系统默认只有一个reduce,将所有的输出都将写入到part-r-00000的文件中,MultiRecordWriter所做的工作就是屏蔽了到part-r-00000的输出,而是将同一个reduce的数据拆分为多个文件。

4、LineRecordWriter.java

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LineRecordWriter<K, V> extends RecordWriter<K, V>
{
	private static final String utf8 = "UTF-8";//定义字符编码格式
	protected DataOutputStream out;
   //构造方法
	public LineRecordWriter(DataOutputStream out, String keyValueSeparator) 
    {
		this.out = out;
	}
	public LineRecordWriter(DataOutputStream out) 
    {
		this(out, "/t");
	}
	private void writeObject(Object o) throws IOException 
    { //将目标文件处理成字节数组
		if (o instanceof BytesWritable) 
        {
			BytesWritable to = (BytesWritable) o;
			out.write(o.getBytes(), 0, o.getLength());
		} 
        else 
        {
			out.write(o.toString().getBytes(utf8));
		}
	}
   //将key、value以自定义格式写入到输出流中
	public synchronized void write(K key, V value) throws IOException 
    {
		boolean nullKey = key == null || key instanceof NullWritable;
		boolean nullValue = value == null || value instanceof NullWritable;
		if (nullKey && nullValue) 
        {
			return;
		}
		if (!nullValue) 
        {
			writeObject(value);
		}
	}
	public synchronized void close(TaskAttemptContext context) throws IOException 
    {
		out.close();
	}
}

此处LineRecordWriter是RecordWriter的一个内部类,用于实现输出。主要构建两个方法:writer()和close(),于把<Key,Value>转化为一行文本。这里仅仅是把LineRecordWriterDataOutputStream抽取出来,以自定义形式来定义文件输出时格式,即单个文件为一个Value输出。

5、Test.java

import java.io.IOException;  
import java.net.URI;      
import javax.xml.soap.Text;   
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.*;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;       
public class Test 
    {  
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException 
        {  
            Configuration conf = new Configuration(); 
            Job job = Job.getInstance(conf, "image sequence");  
            job.setJarByClass(Test.class); 
            FileInputFormat.addInputPath(job, new Path(args[0]));  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));  

            job.setInputFormatClass(SequenceFileInputFormat.class);  
            job.setOutputFormatClass(MyMultipleOutputFormat.class);

            job.setMapperClass(ImageMapper.class);
            job.setMapOutputKeyClass(Text.class);  
            job.setMapOutputValueClass(BytesWritable.class);  

            job.setNumReduceTasks(0);

            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }  

Test是驱动程序,用来在Hadoop集群上运行MapReduce作业。Job实例用来指定作业执行规范,以便控制整个集群上各任务的分发与处理。为了能在集群上运行这个作业,作业的类必须打包成一个Jar包并发送给集群。系统将通过驱动程序setJarByClass中的设定找到Jar文件。

调用FileInputFormat类的addInputPath()来定义输入数据的路径,其中args[0]指明命令行控制的输入数据路径。调用FileInputFormat类的setOutputPath()来定义输出数据的路径,args[1]指明命令行控制的输出结果路径。该路径在运行程序前不存在于HDFS集群,否则运行结果将会出错。

setInputFormatClass()用来指定mapreduce函数的输入数据格式为SequenceFileInputFormatsetOutputFormatClass()用来指定输出结果格式为MyMultipleOutputFormat.class,即自定义的多文件输出形式。若输入数据类型与实际设定不一致,系统将会报错。

接着,通过setMapOutputKeyClass()设定map函数输入类型为Text,通过setMapOutputValueClass()设定map函数输入值类型为BytesWritable,通过setMapperClass()指定要用的map类型为ImagMappermap函数的输出类型默认情况下和reduce函数是相同的。由于本论文中我们没有调用reduce函数,所以不需要设定。

在设定mapreduce函数类后开始运行作业。Job中的waitForCompletion方法提交作业并等待执行完成。该方法的参数用于指示是否已生成详细输出。waitForCompletion()方法返回一个布尔值,这个布尔值被转换成程序的退出代码01,表示执行的truefalse


猜你喜欢

转载自blog.csdn.net/LinLin_Hou/article/details/80634379