import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Serializable;
import scala.Tuple2;
import java.io.*;
/**
* 此代码用spark实现了从hbase表中导出csv格式的文件并保存到指定路径 ,其中涉及到 读取文件夹指定文件内容 并删除文件夹 而后保存文件 操作
*
*/
public class SparkExToHBase implements Serializable {
private static int flag;
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf().setAppName("spark-i").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
String table = "member";
sc.hadoopConfiguration().set("hbase.zookeeper.quorum","master:2181,slave1:2181,slave2:2181");//当然这些都可以作为参数传入,这里只是实验,所以写死在了代码里,实际成产过程中肯定要用参数的方式
sc.hadoopConfiguration().set("hbase.rootdir","hdfs://master:9000/hbase");
sc.hadoopConfiguration().set(TableInputFormat.INPUT_TABLE, table);
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(sc.hadoopConfiguration(),TableInputFormat.class,
ImmutableBytesWritable.class,Result.class);
JavaRDD<String> resultRDD = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, String>() {
@Override
public String call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
String key = new String(immutableBytesWritableResultTuple2._1.get());
Cell[] kv = immutableBytesWritableResultTuple2._2.rawCells(); //result
String keyValue = "";
String val = "";
for (int i = 0; i < kv.length; i++) {
String va = new String(kv[i].getValue());
String qualifier = new String(kv[i].getQualifier());
String columnFamily = new String(kv[i].getFamily());
if(flag==0){
keyValue += columnFamily+":"+qualifier+","; //想导出表头也就是列族和列,但是spark好像并不能够 那么这里可以删除不用
val += va + ",";
}else {
keyValue += va + ",";
}
}
if(flag==0){
keyValue = "id"+","+keyValue +"\r\n"+key+","+val; //想导出表头也就是列族和列,但是spark好像并不能够 那么这里可以删除不用
}else{
keyValue = key+","+keyValue;
}
flag++;
return keyValue;
}
});
System.out.println("~~~~~~~~~~~~ val: "+resultRDD.collect());
String outPath = "C:\\Users\\Administrator\\Desktop\\spark";
Path path = new Path(outPath);
FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
if (fs.exists(path)) {
System.out.println("导出路径 存在 删除!!"+path);
fs.delete(path, true);
}
resultRDD.saveAsTextFile(outPath);
//调用 FileDirUtil.saveOutPath 方法将上一步spark运行得到的文件夹 筛选出所需要的文件信息 并删除 而后在此路径上加上csv后缀并保存输出内容
FileDirUtil.saveOutPath(outPath,"csv");
sc.stop();
}
}
hbase member 表
运行后:spark.csv