import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * 此代码实现了根据csv文件提供的表头(列族:列)自动化建表(如果表不存在)以及向表中插入csv中的数据。 * 注意1:有一个小 bug,就是表头它也会插入到 hbase表中,需要另行删除表头这一行 * 注意2:spark是要先运行action算子才会运行transformation,也就是说 它会从先从 62行 跳到 112 行 再回来运行 62-112之间的代码 */ public class SparkImToHBase { private static int flag; private static int exitTable; static Admin admin; static HTableDescriptor desc; private static String[] fam; private static String[] qua; private static String headKey = ""; public static void main(String[] args) throws IOException{ SparkConf conf = new SparkConf().setAppName("im").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String table = "member2"; String path = "C:\\Users\\Administrator\\Desktop\\im.csv"; sc.hadoopConfiguration().set("hbase.zookeeper.quorum","master:2181,slave1:2181,slave2:2181");//当然这些都可以作为参数传入,这里只是实验,所以写死在了代码里,实际成产过程中肯定要用参数的方式 sc.hadoopConfiguration().set("hbase.rootdir","hdfs://master:9000/hbase"); sc.hadoopConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table); admin = new HBaseAdmin(sc.hadoopConfiguration()); desc = new HTableDescriptor(TableName.valueOf(table)); if(admin.tableExists(TableName.valueOf(table.substring(table.indexOf(":")+1,table.length())))){ System.out.println("表已经存在,数据添加中......."); exitTable = 1; // System.exit(0); }else{ exitTable = 0 ; } Job job = Job.getInstance(sc.hadoopConfiguration(),"spark"); job.setOutputFormatClass(TableOutputFormat.class); JavaPairRDD<ImmutableBytesWritable,Put> hbPairRDD = sc.textFile(path).mapToPair(new PairFunction<String, ImmutableBytesWritable, Put>() { @Override public Tuple2<ImmutableBytesWritable, Put> call(String s) throws Exception { Put put = null; String[] splited = s.split(","); String family[] = new String[splited.length]; String qulifier[] = new String[splited.length]; put = new Put(Bytes.toBytes(splited[0])); for (int i = 1; i < splited.length; i++) { if (flag == 0) { //处理表头,获取列族和列 headKey = splited[0]; if (splited[i].indexOf(":") > 0) { family[i] = splited[i].substring(0, splited[i].indexOf(":")); qulifier[i] = splited[i].substring(splited[i].indexOf(":") + 1, splited[i].length()); }else { family[i] = splited[i]; qulifier[i] = ""; } }else{ put.addColumn(Bytes.toBytes(fam[i]), Bytes.toBytes(qua[i]), Bytes.toBytes(splited[i])); System.out.println(splited[0] + " " + fam[i] + " " + qua[i] + " " + splited[i]); } } if(flag==0){ flag++; fam = family; qua = qulifier; if (exitTable == 0) { List<String> list = new ArrayList<>(); for (int i = 0; i < fam.length; i++) { if (!list.contains(family[i])) { list.add(family[i]); } } for (int i = 0; i < list.size(); i++) { desc.addFamily(new HColumnDescriptor(String.valueOf(list.get(i)))); } admin.createTable(desc); exitTable = 0; System.out.println("表创建成功! "); } return null; } flag++; return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(),put); } }); System.out.println("~~~~~~~~~~~~~~"+ hbPairRDD.count()); hbPairRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());//会把表头也当成列写进member1表中 System.out.println("删除第一行表头:"+headKey); HTable htable=new HTable(sc.hadoopConfiguration(), table); Delete de =new Delete(Bytes.toBytes(headKey)); htable.delete(de); sc.stop(); } }
im.csv文件
idea运行结果:
hbase结果: