0. 说明
测试序列文件的读写操作 && 测试序列文件的压缩方式 && 测试将日志文件转换成序列文件
作为 Hadoop 序列文件 中的 SequenceFile 的基本操作 部分的补充存在
1. 测试读写 && 压缩
package hadoop.SequenceFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.junit.Test; import java.io.IOException; /** * 测试序列文件 */ public class TestSeqFile { /** * 测试序列文件写操作 */ @Test public void testWriteSeq() throws Exception { Configuration conf = new Configuration(); // 设置文件系统为本地模式 conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); // Path path = new Path("E:/test/none.seq"); // Path path = new Path("E:/test/record.seq"); Path path = new Path("E:/test/block.seq"); // 不压缩 // SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE); // 记录压缩 // SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD); // 块压缩 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK); for (int i = 1; i <= 1000; i++) { IntWritable key = new IntWritable(i); Text value = new Text("helloworld" + i); writer.append(key, value); } writer.close(); } /** * 测试序列文件读操作 */ @Test public void testReadSeq() throws Exception { Configuration conf = new Configuration(); // 设置文件系统为本地模式 conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("E:/test/block.seq"); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); //初始化两个 Writable 对象 IntWritable key = new IntWritable(); Text value = new Text(); while ((reader.next(key, value))) { long position = reader.getPosition(); System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position); } } }
2. 测试将日志文件转换成序列文件
package hadoop.SequenceFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; /** * 测试将日志文件转换成序列文件 * Windows 下查看压缩后的 SequenceFile : * hdfs dfs -text file:///E:/test/access.seq */ public class Log2Seq { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 设置文件系统为本地模式 conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("E:/test/access.seq"); // 不压缩 // SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE); // 记录压缩 // SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD); // 块压缩 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK); BufferedReader br = new BufferedReader(new FileReader("E:/file/access.log1")); String line = null; while ((line = br.readLine()) != null) { NullWritable key = NullWritable.get(); Text value = new Text(line); writer.append(key, value); } writer.close(); } }