前言:
老实说,我觉得在两个无法通信的集群中迁移数据是一件非常蠢的事情,但是既然上面这么要求了,也只能这么做了~
正文:
为什么要强调不通信集群,因为可以互通的集群之间要进行数据转移有很多工具,只要一行命令就可以了,比如distcp。不过虽然是不通信集群,但至少也要有一台机器可以连通这两个集群作为中转站,不然凭空将数据转移到另一个集群臣妾我实在是无能为力啊。
其实这种方式也很简单,一般的做法就是将HDFS上的数据get到指定的节点上,然后通过再把数据从节点上拉倒自己的机器上,再上传到目标集群的一个节点上,然后再上传到HDFS上。很好理解,但是效率实在是太低。
这里的方法是通过Java API连接到HDFS上,直接进行文件的传输。相比上面的方法,这种方式下只要指定好目录,剩下的一切都将由程序完成。
步骤:
1. 首先,建立一个类(废话),指定好目录(如果想打成jar包就设置从参数中获取目录)和IP(如果是以“hdfs://XXX.XXX.XXX.XXX:8020/data/XX”的格式指定目录的,可以在程序中手动分割出IP)。
2. 获取源集群和目标集群的IP后,分别创建对应的HDFS连接(这里将获取连接的方法写成了一个方法):
3. 分别获取到HDFS连接后,首先去读取指定目录下的文件列表:
4. 获取到文件列表之后,就可以开始传输了。在这里有两种方式,一种是将文件数据从源集群HDFS中读取出来后,先存储到本地,然后再上传到目标集群,这一种方式下不仅会消耗带宽,还会进行大量的磁盘读写,让电脑运行变卡;第二就是不把数据落地到本地,而是直接传输到目标集群中。之前我没有考虑到这个问题,选了第一种方式,结果程序跑起来整个电脑都卡的无法操作了!
1)获取到需要传输的文件列表后,先获取这些文件的输入输出流(目标文件的路径可以自己指定,这里的filePath是原目录下的文件路径,我直接把原地址替换成了目标地址):
获取文件输入流的方法:
获取文件输出流的方法:
2)获取到文件输入输出流后,就可以开始遍历传输了。
3. 传输完成,关闭连接。
这样,跨集群文件的传输就完成了。
下面是完整的代码:
1 package controller;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.FileSystem;
5 import org.apache.hadoop.fs.LocatedFileStatus;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.fs.RemoteIterator;
8 import org.apache.hadoop.io.IOUtils;
9 import java.io.*;
10 import java.net.URI;
11 import java.util.ArrayList;
12 import java.util.List;
13
14 import static org.apache.hadoop.fs.FileSystem.get;
15
16 public class Demo {
17 // 连接参数设置
18 private static Configuration conf = new Configuration();
19 static{
20 conf.set("dfs.client.use.datanode.hostname", "true");
21 conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
22 conf.set("dfs.permissions","false");
23 }
24 public static void main(String[] args) {
25 // 获取参数
26 String addr1="hdfs://XX.XX.XX.XX:8020/tmp/dataA";
27 String addr2="hdfs://YY.YY.YY.YY:8020/tmp/dataB";
28 String tableName="OfflineUserProfileV1";
29 String IP1=addr1.split(":8020")[0].replaceAll("//", "/").split("/")[1];
30 String IP2=addr2.split(":8020")[0].replaceAll("//", "/").split("/")[1];
31
32 // 获取源集群HDFS连接
33 FileSystem originFS=null;
34 try {
35 originFS=getFSConnection(IP1);
36 } catch (Exception e) {
37 System.err.println("获取源集群HDFS连接失败,程序终止运行");
38 e.printStackTrace();
39 }
40 // 获取目标集群HDFS连接
41 FileSystem targetFS=null;
42 try {
43 targetFS=getFSConnection(IP2);
44 } catch (Exception e) {
45 System.err.println("获取目标集群HDFS连接失败,程序终止运行");
46 e.printStackTrace();
47 closeFS(originFS);
48 }
49 // 获取连接后,读取源地址文件列表
50 List<String> fileList=null;
51 try {
52 fileList=getFileList(originFS, addr1);
53 } catch (IOException e) {
54 System.err.println("获取文件列表失败,程序终止");
55 e.printStackTrace();
56 closeFS(originFS);
57 closeFS(targetFS);
58 }
59 // 文件列表读取完成,遍历文件列表,依次传输文件
60 int len=fileList.size();
61 for (String filePath:fileList) {
62 // 获取文件输出输入流
63 String targetFilePath=filePath.replace(addr1, addr2);
64 InputStream in =null;
65 OutputStream out=null;
66 try{
67 in=getInputStream(originFS, filePath);
68 out=getOutputStream(targetFS, targetFilePath);
69 System.out.print("正在传输文件:"+targetFilePath+" …… ……");
70 IOUtils.copyBytes(in,out,conf,true);
71 System.out.println(" 完成");
72 }catch(IOException e) {
73 System.err.println("文件 "+filePath+" 传输失败,将跳过该文件的传输");
74 e.printStackTrace();
75 continue;
76 }
77 }
78
79 // 关闭HDFS连接
80 closeFS(originFS);
81 closeFS(targetFS);
82
83 }
84
85 /**
86 * 获取指定集群HDFS连接
87 * @param IP 指定集群IP
88 * @return
89 * @throws Exception
90 */
91 public static FileSystem getFSConnection(String IP) throws Exception {
92 FileSystem fs = null;
93 String cluster = "hdfs://"+IP+":8020";
94 fs = get(new URI(cluster), conf, "hdfs");
95 return fs;
96 }
97
98 /**
99 * 安全关闭HDFS文件系统连接
100 * @param fs HDFS连接
101 */
102 public static void closeFS(FileSystem fs){
103 try{
104 if(fs!=null)
105 fs.close();
106 }catch(IOException e){
107 try {
108 fs.close();
109 } catch (IOException ex) {
110 fs=null;
111 }
112 }
113 }
114
115
116 /**
117 * 读取指定目录下的文件列表
118 * @param fs HDFS连接
119 * @param path 文件夹路径
120 * @return 文件路径列表
121 * @throws IOException
122 */
123 public static List<String> getFileList( FileSystem fs , String path ) throws IOException {
124 List<String> list = new ArrayList<>();
125 // 获取文件列表
126 RemoteIterator<LocatedFileStatus> it = null;
127 it = fs.listFiles(new Path(path), true);
128 while (it.hasNext()) {
129 // 获取文件路径并存入集合
130 LocatedFileStatus file = it.next();
131 list.add(file.getPath().toString());
132 }
133 return list;
134 }
135
136 /**
137 * 获取文件输入流
138 * @param fs
139 * @param filePath
140 * @return
141 * @throws IOException
142 */
143 public static InputStream getInputStream( FileSystem fs , String filePath ) throws IOException {
144 InputStream in = null;
145 in = fs.open(new Path(filePath));
146 return in;
147 }
148
149 /**
150 * 获取文件输出流
151 * @param fs
152 * @param filePath
153 * @return
154 * @throws IOException
155 */
156 public static OutputStream getOutputStream( FileSystem fs , String filePath ) throws IOException {
157 OutputStream out = fs.create(new Path(filePath),true);
158 return out;
159 }
160
162 }