一、NN元数据管理机制
NN的职责:
1.维护元数据信息
2.维护HDFS的目录树
3.响应客户端
①用户向NN申请上传文件
②NN将分配的DN信息记录追加在edit.log的文件中
③NN将分配的DN信息返回给客户端
④客户端将文件切块后,上传到各个DN节点上
⑤客户端将上传成功的信息返回给NN节点,
⑥NN将edit.log文件中的内容写入内存中,一次上传文件的操作完成了
⑦当edit.log文件被写满之后,将会执行checkpoint操作,与fsimage合并,刷进内存的镜像文件里面(fsimge)
③NN将分配的DN信息返回给客户端
④客户端将文件切块后,上传到各个DN节点上
⑤客户端将上传成功的信息返回给NN节点,
⑥NN将edit.log文件中的内容写入内存中,一次上传文件的操作完成了
⑦当edit.log文件被写满之后,将会执行checkpoint操作,与fsimage合并,刷进内存的镜像文件里面(fsimge)
二、SN工作机制
①当edit.log文件满了之后,将会通知SN执行CheckPoint操作
②SN要求NN停止往edits.log文件中写入数据
③并为他推送一个新的文件让他往里面写数据
④SN将下载NN里面的fsimage,edit.log,用来执行合并操作
⑤执行合并操作
⑥将合并之后的fsimage上传给NN,并将那个新的文件edits.new.log重新命名为之前的名字edits.log
三、HDFS的java客户端编写
1.所需要jar包
hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\hdfs hadoop-hdfs-2.7.2.jar 核心包
hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\hdfs\lib
*依赖包
hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\common hadoop-common-2.7.2.jar 核心包
hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\common\lie
*依赖包
2.window下开发注意权限问题
修改执行时候的环境变量:Run Configurations
-DHADOOP_USER_NAME=hadoop
package hdfs;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Ignore;
import org.junit.Test;
public class hdfsapiclient {
/**
* 上传文件
* @throws Exception
*/
@Ignore
public void upload() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
FileSystem fs = FileSystem.get(conf);
//-------------------
Path f = new Path("hdfs://192.168.16.100:9000/aa/wangqingchun.txt");
FSDataOutputStream out = fs.create(f);
FileInputStream in = new FileInputStream("F:/wangqingchun.txt");
IOUtils.copy(in, out);
}
@Test
public void upload2() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
//--------------
fs.copyFromLocalFile(new Path("F:/wangqingchun.txt"), new Path("hdfs://192.168.16.100:9000/aaa/bbb/ccc/wangqingchun4.txt"));
}
@Test
public void download() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
//-------
//fs.copyFromLocalFile(new Path("F:/wangqingchun.txt"), new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"));
//fs.copyToLocalFile(new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"), new Path("F:/wang.txt"));
fs.copyToLocalFile(new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"), new Path("F:/wang.txt"));
// FSDataInputStream in = fs.open(new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"));
// FileOutputStream out = new FileOutputStream("F:/down.txt");
// IOUtils.copy(in, out);
}
@Test
public void mkdir() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.1.100:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
fs.mkdirs(new Path("/aaa/bbb/ccc"));
}
@Test
public void rm() throws Exception, InterruptedException, URISyntaxException{
Configuration conf = new Configuration();
conf.set("ds.defaultFS", "hdfs://192.168.16.100:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
fs.delete(new Path("hdfs://192.168.16.100:9000/aaa"), true);
}
@Test
public void listFile() throws Exception, InterruptedException, URISyntaxException{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus file = listFiles.next();
String name = file.getPath().getName();
System.out.println(name);
}
}
@Test
public void listfiles() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for(FileStatus filestatuts : listStatus){
System.out.println(filestatuts.isDirectory()?" is dir: "+filestatuts.getPath().getName():" is file: "+filestatuts.getPath());
}
}
}
四、Hadoop中的RPC框架实现机制
RPC(Remote Procedure Call Protocol)——
远程过程调用协议,它是一种通过
网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。
RPC协议假定某些
传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI
网络通信模型中,RPC跨越了
传输层和
应用层。RPC使得开发包括网络
分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复
信息
,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行
PRC的服务端demo
1.业务接口
package com.hdfs.rpc.service;
/**
* RPC服务端的service接口
* @author Administrator
*
*/
public interface LoginServiceInterface {
//定义协议的版本号
public static final long versionID=1L;
/**
* 简单的登陆服务,返回登陆成功
* @param username
* @param password
* @return
*/
public String login(String username,String password);
}
2.业务接口的实现
package com.hdfs.rpc.service;
/**
* RPC服务端service接口的实现
* @author Administrator
*
*/
public class LoginServiceImpl implements LoginServiceInterface {
/**
* 简单的登陆服务,返回登陆成功
*/
@Override
public String login(String username, String password) {
return username+"logged in successfully!";
}
}
3.发布服务端,运行main方法进程阻塞等待客户端请求
package com.hdfs.rpc;
import java.io.IOException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import com.hdfs.rpc.service.LoginServiceImpl;
import com.hdfs.rpc.service.LoginServiceInterface;
/**
* RPC的服务端,应用RPC框架把service发布为一个服务
* @author Administrator
*
*/
public class Starter {
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
//1.通过RPC的内部类得到builder
RPC.Builder builder = new RPC.Builder(new Configuration());
//绑定的IP
builder.setBindAddress("hadoop100");
//服务端口,自己指定端口
builder.setPort(10000);
//使用的协议,双方调用的那个服务接口
builder.setProtocol(LoginServiceInterface.class);
//协议的实现实列
builder.setInstance(new LoginServiceImpl());
//2.通过builder得到server
Server server= builder.build();
//3.发布server服务
server.start();
}
}
RPC客户端demo
1.复制服务端的协议接口
package com.hdfs.rpc.client;
/**
* RPC服务端的service接口
* @author Administrator
*
*/
public interface LoginServiceInterface {
//定义协议的版本号
public static final long versionID=1L;
/**
* 简单的登陆服务,返回登陆成功
* @param username
* @param password
* @return
*/
public String login(String username,String password);
}
2.客户端请求
package com.hdfs.rpc.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
/**
* RPC的客户端
* @author Administrator
*
*/
public class LoginClient {
/**
* 通过动态代理实现远程服务
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//得到带代理对象
LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("192.168.0.101", 10000), new Configuration());
String result = proxy.login("usernamestring", "123456");
System.out.println(result);
}
}