02-HDFS的原理和使用操作、编程

一、NN元数据管理机制

NN的职责:
1.维护元数据信息
2.维护HDFS的目录树
3.响应客户端

①用户向NN申请上传文件
②NN将分配的DN信息记录追加在edit.log的文件中
③NN将分配的DN信息返回给客户端
④客户端将文件切块后,上传到各个DN节点上
⑤客户端将上传成功的信息返回给NN节点,
⑥NN将edit.log文件中的内容写入内存中,一次上传文件的操作完成了
⑦当edit.log文件被写满之后,将会执行checkpoint操作,与fsimage合并,刷进内存的镜像文件里面(fsimge)

二、SN工作机制


①当edit.log文件满了之后,将会通知SN执行CheckPoint操作
②SN要求NN停止往edits.log文件中写入数据
③并为他推送一个新的文件让他往里面写数据
④SN将下载NN里面的fsimage,edit.log,用来执行合并操作
⑤执行合并操作
⑥将合并之后的fsimage上传给NN,并将那个新的文件edits.new.log重新命名为之前的名字edits.log

三、HDFS的java客户端编写

1.所需要jar包
    hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\hdfs                    hadoop-hdfs-2.7.2.jar    核心包
     hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\hdfs\lib                     *依赖包
    hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\common            hadoop-common-2.7.2.jar    核心包
     hadoop-2.7.2.tar\hadoop-2.7.2\share\hadoop\common\lie           *依赖包
2.window下开发注意权限问题
    修改执行时候的环境变量:Run Configurations
    -DHADOOP_USER_NAME=hadoop

    
     
     
  1. package hdfs;
  2. import java.io.FileInputStream;
  3. import java.io.FileOutputStream;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import org.apache.commons.io.IOUtils;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FSDataInputStream;
  10. import org.apache.hadoop.fs.FSDataOutputStream;
  11. import org.apache.hadoop.fs.FileStatus;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.LocatedFileStatus;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.fs.RemoteIterator;
  16. import org.junit.Ignore;
  17. import org.junit.Test;
  18. public class hdfsapiclient {
  19. /**
  20. * 上传文件
  21. * @throws Exception
  22. */
  23. @Ignore
  24. public void upload() throws Exception{
  25. Configuration conf = new Configuration();
  26. conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
  27. FileSystem fs = FileSystem.get(conf);
  28. //-------------------
  29. Path f = new Path("hdfs://192.168.16.100:9000/aa/wangqingchun.txt");
  30. FSDataOutputStream out = fs.create(f);
  31. FileInputStream in = new FileInputStream("F:/wangqingchun.txt");
  32. IOUtils.copy(in, out);
  33. }
  34. @Test
  35. public void upload2() throws Exception{
  36. Configuration conf = new Configuration();
  37. conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
  38. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
  39. //--------------
  40. fs.copyFromLocalFile(new Path("F:/wangqingchun.txt"), new Path("hdfs://192.168.16.100:9000/aaa/bbb/ccc/wangqingchun4.txt"));
  41. }
  42. @Test
  43. public void download() throws Exception{
  44. Configuration conf = new Configuration();
  45. conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
  46. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
  47. //-------
  48. //fs.copyFromLocalFile(new Path("F:/wangqingchun.txt"), new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"));
  49. //fs.copyToLocalFile(new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"), new Path("F:/wang.txt"));
  50. fs.copyToLocalFile(new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"), new Path("F:/wang.txt"));
  51. // FSDataInputStream in = fs.open(new Path("hdfs://192.168.16.100:9000/aa/wangqingchun3.txt"));
  52. // FileOutputStream out = new FileOutputStream("F:/down.txt");
  53. // IOUtils.copy(in, out);
  54. }
  55. @Test
  56. public void mkdir() throws Exception{
  57. Configuration conf = new Configuration();
  58. conf.set("fs.defaultFS", "hdfs://192.168.1.100:9000");
  59. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
  60. fs.mkdirs(new Path("/aaa/bbb/ccc"));
  61. }
  62. @Test
  63. public void rm() throws Exception, InterruptedException, URISyntaxException{
  64. Configuration conf = new Configuration();
  65. conf.set("ds.defaultFS", "hdfs://192.168.16.100:9000");
  66. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
  67. fs.delete(new Path("hdfs://192.168.16.100:9000/aaa"), true);
  68. }
  69. @Test
  70. public void listFile() throws Exception, InterruptedException, URISyntaxException{
  71. Configuration conf = new Configuration();
  72. conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
  73. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
  74. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  75. while(listFiles.hasNext()){
  76. LocatedFileStatus file = listFiles.next();
  77. String name = file.getPath().getName();
  78. System.out.println(name);
  79. }
  80. }
  81. @Test
  82. public void listfiles() throws Exception{
  83. Configuration conf = new Configuration();
  84. conf.set("fs.defaultFS", "hdfs://192.168.16.100:9000");
  85. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.16.100:9000"),conf,"hadoop");
  86. FileStatus[] listStatus = fs.listStatus(new Path("/"));
  87. for(FileStatus filestatuts : listStatus){
  88. System.out.println(filestatuts.isDirectory()?" is dir: "+filestatuts.getPath().getName():" is file: "+filestatuts.getPath());
  89. }
  90. }
  91. }

四、Hadoop中的RPC框架实现机制

       RPC(Remote Procedure Call Protocol)—— 远程过程调用协议,它是一种通过 网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。 RPC协议假定某些 传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI 网络通信模型中,RPC跨越了 传输层应用层。RPC使得开发包括网络 分布式多程序在内的应用程序更加容易。
        RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复 信息 ,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行

PRC的服务端demo


1.业务接口
   
    
    
  1. package com.hdfs.rpc.service;
  2. /**
  3. * RPC服务端的service接口
  4. * @author Administrator
  5. *
  6. */
  7. public interface LoginServiceInterface {
  8.    //定义协议的版本号
  9.    public  static final long versionID=1L;
  10. /**
  11. * 简单的登陆服务,返回登陆成功
  12. * @param username
  13. * @param password
  14. * @return
  15. */
  16. public String login(String username,String password);
  17. }
2.业务接口的实现
    
     
     
  1. package com.hdfs.rpc.service;
  2. /**
  3. * RPC服务端service接口的实现
  4. * @author Administrator
  5. *
  6. */
  7. public class LoginServiceImpl implements LoginServiceInterface {
  8. /**
  9. * 简单的登陆服务,返回登陆成功
  10. */
  11. @Override
  12. public String login(String username, String password) {
  13. return username+"logged in successfully!";
  14. }
  15. }
3.发布服务端,运行main方法进程阻塞等待客户端请求
     
      
      
  1. package com.hdfs.rpc;
  2. import java.io.IOException;
  3. import org.apache.hadoop.HadoopIllegalArgumentException;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.ipc.RPC;
  6. import org.apache.hadoop.ipc.RPC.Server;
  7. import com.hdfs.rpc.service.LoginServiceImpl;
  8. import com.hdfs.rpc.service.LoginServiceInterface;
  9. /**
  10. * RPC的服务端,应用RPC框架把service发布为一个服务
  11. * @author Administrator
  12. *
  13. */
  14. public class Starter {
  15. public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
  16. //1.通过RPC的内部类得到builder
  17. RPC.Builder builder = new RPC.Builder(new Configuration());
  18. //绑定的IP
  19. builder.setBindAddress("hadoop100");
  20. //服务端口,自己指定端口
  21. builder.setPort(10000);
  22. //使用的协议,双方调用的那个服务接口
  23. builder.setProtocol(LoginServiceInterface.class);
  24. //协议的实现实列
  25. builder.setInstance(new LoginServiceImpl());
  26. //2.通过builder得到server
  27. Server server= builder.build();
  28. //3.发布server服务
  29. server.start();
  30. }
  31. }

RPC客户端demo

 

1.复制服务端的协议接口
      
       
       
  1. package com.hdfs.rpc.client;
  2. /**
  3. * RPC服务端的service接口
  4. * @author Administrator
  5. *
  6. */
  7. public interface LoginServiceInterface {
  8. //定义协议的版本号
  9. public static final long versionID=1L;
  10. /**
  11. * 简单的登陆服务,返回登陆成功
  12. * @param username
  13. * @param password
  14. * @return
  15. */
  16. public String login(String username,String password);
  17. }
2.客户端请求
       
        
        
  1. package com.hdfs.rpc.client;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.ipc.RPC;
  6. /**
  7. * RPC的客户端
  8. * @author Administrator
  9. *
  10. */
  11. public class LoginClient {
  12. /**
  13. * 通过动态代理实现远程服务
  14. * @param args
  15. * @throws Exception
  16. */
  17. public static void main(String[] args) throws Exception {
  18. //得到带代理对象
  19. LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("192.168.0.101", 10000), new Configuration());
  20. String result = proxy.login("usernamestring", "123456");
  21. System.out.println(result);
  22. }
  23. }

猜你喜欢

转载自blog.csdn.net/wang11yangyang/article/details/58016552