从0到1编写分布式文件系统——Master模块(一)
首先,通过定义一种业务请求来构建及完善核心代码,那应该定义哪种业务请求比较合适呢?既然是文件系统,那就有文件夹的管理功能,那该请求为:客户端发送创建文件夹的消息到master节点,master节点接收该消息并成功创建文件夹。
要处理该请求,需要有以下几部分的功能:
1、消息结构及处理机制。必须定义一种通用的通信协议以及处理机制,能处理客户端发送过来不同的消息请求;
2、网络通讯模块。master必须接收不同客户端发送过来的请求;
3、元数据结构及处理机制。必须定义一种目录树结构来存储客户端发送过来文件夹处理请求。
通信协议
protocol content | size | description |
---|---|---|
magic | 1 byte | 每个数据包的头个字节都定义了0xDF魔数,用作检测 |
type | 1 byte | 每种request和reponse都定义了唯一一种类型 |
id | 8 bytes | 为每个request定义了唯一的请求ID |
serialize | 1 byte | 每个数据包指定了序列化的算法 |
length | 4 bytes | 数据包的大小 |
body | X bytes | 数据包的内容 |
通信协议的内容如表格所示(具体可以查看类Packet),无论是Request还是Response都是发送packet数据包,Request或Response的具体内容为协议的body内容。但是这里如何识别是Request请求还是Response回复呢,主要是通过type字段去识别。type字段是一个byte,通过字节的第一位来区分,0是request请求,1是response回复,以mkdir request(创建文件夹)为例,mkdir reponse则是将mkdir request的第一位变为1。
序列化
通信协议中有个serialize字段,表示客户端可以选择不同的序列化算法对数据包进行序列化和反序列化。于是,接下来开始编写不同的序列化算法,SDFS暂时只用了Kryo算法。 定义一个通用的序列化接口
package org.simpledfs.core.serialize;
public interface Serializer {
byte[] serialize(Object object);
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
复制代码
使用Kryo框架,因为KryoSerializer内部使用池化对象处理,所以采用了单例模式构建了KryoSerializer对象,其构造函数为私有。
public class KryoSerializer implements Serializer {
private KryoSerializer() {
}
...
private static class PoolHolder {
private static Pool<Kryo> kryoPool = new Pool<Kryo>(true, false, 8) {
@Override
protected Kryo create() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false);
kryo.setReferences(true);
return kryo;
}
};
}
...
}
复制代码
定义一个序列化算法选择器SerializerChooser,根据协议中serialize字节,来获取不同的序列化算法对数据包进行解析,但目前只实现了Kryo,所以在choose方法默认只返回KryoSerializer实例。
public class SerializerChooser {
private SerializerChooser() {
}
public Serializer choose(byte type){
return KryoSerializer.getInstance();
}
private static class SerializerChooserHolder {
private static SerializerChooser chooser = new SerializerChooser();
}
public static SerializerChooser getInstance() {
return SerializerChooserHolder.chooser;
}
}
复制代码
处理机制
因为客户端会发送各种类型的请求,如何针对不同的请求进行处理呢? 首先定义顶层的processor接口,该接口只有一个process方法,并且集成了线程接口Runable,以后每个processor能包装成一个线程在线程池中运行
public interface Processor extends Runnable {
public void process();
}
复制代码
之后定义一个通用的抽象类AbstractRequestProcessor,该类implements Processor,并声明了几个变量,这几个变量在后续做说明。Processor接口的process方法在AbstractRequestProcessor中声明为抽象方法,由具体请求的proceeor来编写自己的处理方法。
public abstract class AbstractRequestProcessor implements Processor {
protected ChannelHandlerContext ctx;
protected Request request;
protected Context context;
protected long packetId;
...
public abstract void process();
@Override
public void run() {
process();
}
...
}
复制代码
最后,为mkdirRequest 定义了具体的MkdirRequestProcessor来处理创建文件夹的请求,MkdirRequestProcessor具体处理的业务逻辑后续做说明,还涉及到master元数据的修改。那MkdirRequestProcessor通过什么方式构造出来的呢?是通过所有request中继承的buildSelfProcessor抽象方法,各自的request构造各自processor,通过这种灵活的方式对代码进行解耦,添加每一个request的同时只要添加对应的processor,不需要对其他代码进行修改。
public class MkdirRequest extends AbstractRequest {
...
@Override
public Processor buildSelfProcessor(ChannelHandlerContext ctx, Request request, Context context, Long packetId){
MkdirRequestProcessor processor = new MkdirRequestProcessor(ctx, request, context, packetId);
return processor;
}
...
}
复制代码
上篇文章从0到1编写分布式文件系统——框架搭建
详细的代码:Simple Distributed File System