在spark之前版本中使用sparkContext作为进入spark切入点,操作rdd。
spark2.0中引入了SparkSession 作为全新切入点用来操作DataSet和Datafarme。
创建sparksession:
SparkSession spark = SparkSession
.builder()
.appName("Chinadaas ENT_INDEX APP")
//.config(SparkConf)
.enableHiveSupport() //允许使用hive
.getOrCreate();
使用hql从hive中查询生成dataset
public static Dataset getBaseBiKpi(SparkSession spark, String dataDate){
String hql =" select s_ext_nodenum,\n" +
" 1,\n" +
" 2_code,\n" +
" column_4,\n" +
" colunm_5,\n" +
" cplunm_6,\n" +
" column_7\n" +
" from tablename_%s\n" +
" WHERE pripid <> '' limit 10000"; // 测试限制下大小
return DataFrameUtil.getDataFrame(spark,String.format(hql,dataDate),"testDemoTmpTable");
}
生成parquet文件
public static Dataset saveAsParquetOverwrite(Dataset df, String path) {
System.out.println("---------------"+path);
DataFormatConvertUtil.deletePath(path);
if(!DataFormatConvertUtil.isExistsPath(path)){
DataFormatConvertUtil.mkdir(path);
}
df.write().mode(SaveMode.Overwrite).parquet(path);
return df;
}
使用 hive -h localhost 登录hive,引用外部文件生成表,然后可以查看select * from table limit 10;
CREATE EXTERNAL TABLE extData_inv_investment_parquet(
1 string ,
2 string ,
3 string ,
4 string ,
5 string ,
6 string ,
7 string ,
8 string ,
9 string )
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS PARQUET
LOCATION '/tmp/spark_test/ent_index1';
在spark安装目录的bin目录下
启动: ./spark-shell --master yarn
然后 :paste 使用粘贴模式粘贴hql执行。ctrl+d退出模式
var hql参数名称, show(false)全部展示
spark.sql(hql).show(false)