SequenceFile 的基本操作


0. 说明

  测试序列文件的读写操作 && 测试序列文件的压缩方式 && 测试将日志文件转换成序列文件

  作为 Hadoop 序列文件 中的 SequenceFile 的基本操作 部分的补充存在


1. 测试读写 && 压缩

package hadoop.SequenceFile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;

import java.io.IOException;

/**
 * 测试序列文件
 */
public class TestSeqFile {

    /**
     * 测试序列文件写操作
     */
    @Test
    public void testWriteSeq() throws Exception {

        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

//        Path path = new Path("E:/test/none.seq");
//        Path path = new Path("E:/test/record.seq");
        Path path = new Path("E:/test/block.seq");
        // 不压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
        // 记录压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
        // 块压缩
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);


        for (int i = 1; i <= 1000; i++) {
            IntWritable key = new IntWritable(i);
            Text value = new Text("helloworld" + i);

            writer.append(key, value);

        }

        writer.close();
    }

    /**
     * 测试序列文件读操作
     */
    @Test
    public void testReadSeq() throws Exception {
        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path path = new Path("E:/test/block.seq");

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

        //初始化两个 Writable 对象
        IntWritable key = new IntWritable();
        Text value = new Text();

        while ((reader.next(key, value))) {
            long position = reader.getPosition();
            System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
        }
    }

}

 2. 测试将日志文件转换成序列文件

package hadoop.SequenceFile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;

/**
 * 测试将日志文件转换成序列文件
 * Windows 下查看压缩后的 SequenceFile :
 * hdfs dfs -text file:///E:/test/access.seq
 */
public class Log2Seq {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // 设置文件系统为本地模式
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        Path path = new Path("E:/test/access.seq");

        // 不压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
        // 记录压缩
//        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
        // 块压缩
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK);

        BufferedReader br = new BufferedReader(new FileReader("E:/file/access.log1"));

        String line = null;
        while ((line = br.readLine()) != null) {
            NullWritable key = NullWritable.get();
            Text value = new Text(line);
            writer.append(key, value);
        }

        writer.close();
    }
}

猜你喜欢

转载自www.cnblogs.com/share23/p/9887854.html