构建DataFrame参数大于22(反射方式)
一.直接上代码
package com.etlstu
import java.util.Properties
import com.utils.NBF
import com.utilsStu.logmetadata
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
object Bz2ParquetStu {
def main(args: Array[String]): Unit = {
//模拟企业级编程,首先判断目录是否为空
if(args.length != 2){
println("目录不正确,退出程序!!!")
sys.exit()
}
//创建一个集合的输入输出目录
val Array(inputPath,outputPath) = args
val conf = new SparkConf()
.setAppName(s"${this.getClass.getName}").setMaster("local[*]")
//搞定第二个需求
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
//在1.6版本时候默认的压缩方式还不是snappy,到了2.0之后默认是snappy
sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
//开始读取数据
val lines = sc.textFile(inputPath)
//进行过滤,保证字段大于85,并且需要解析内的,,,,,,, 要进行特殊处理
val rdd: RDD[logmetadata] = lines.map(t=>t.split(",",t.length)).filter(_.length >= 85).map(arr=>{
new logmetadata(
arr(0),
NBF.toInt(arr(1)),
NBF.toInt(arr(2)),
NBF.toInt(arr(3)),
NBF.toInt(arr(4)),
arr(5),
arr(6),
NBF.toInt(arr(7)),
NBF.toInt(arr(8)),
NBF.toDouble(arr(9)),
NBF.toDouble(arr(10)),
arr(11),
arr(12),
arr(13),
arr(14),
arr(15),
arr(16),
NBF.toInt(arr(17)),
arr(18),
arr(19),
NBF.toInt(arr(20)),
NBF.toInt(arr(21)),
arr(22),
arr(23),
arr(24),
arr(25),
NBF.toInt(arr(26)),
arr(27),
NBF.toInt(arr(28)),
arr(29),
NBF.toInt(arr(30)),
NBF.toInt(arr(31)),
NBF.toInt(arr(32)),
arr(33),
NBF.toInt(arr(34)),
NBF.toInt(arr(35)),
NBF.toInt(arr(36)),
arr(37),
NBF.toInt(arr(38)),
NBF.toInt(arr(39)),
NBF.toDouble(arr(40)),
NBF.toDouble(arr(41)),
NBF.toInt(arr(42)),
arr(43),
NBF.toDouble(arr(44)),
NBF.toDouble(arr(45)),
arr(46),
arr(47),
arr(48),
arr(49),
arr(50),
arr(51),
arr(52),
arr(53),
arr(54),
arr(55),
arr(56),
NBF.toInt(arr(57)),
NBF.toDouble(arr(58)),
NBF.toInt(arr(59)),
NBF.toInt(arr(60)),
arr(61),
arr(62),
arr(63),
arr(64),
arr(65),
arr(66),
arr(67),
arr(68),
arr(69),
arr(70),
arr(71),
arr(72),
NBF.toInt(arr(73)),
NBF.toDouble(arr(74)),
NBF.toDouble(arr(75)),
NBF.toDouble(arr(76)),
NBF.toDouble(arr(77)),
NBF.toDouble(arr(78)),
arr(79),
arr(80),
arr(81),
arr(82),
arr(83),
NBF.toInt(arr(84))
)
})
//注意需要导入隐式转换,
import sQLContext.implicits._
val dataFrame: DataFrame = rdd.toDF()
dataFrame.registerTempTable("AwenTable")
val sql: DataFrame = sQLContext.sql("select count(*) as ct,provincename,cityname from AwenTable group by provincename,cityname")
//将查询结果保存到MySQL
val df= sql.select("ct","provincename","cityname")
val username: String ="root"
val password: String = "root"
val url: String ="jdbc:mysql://localhost:3306/awenyun"
val prop = new Properties()
prop.put("user", username)
prop.put("password", password)
prop.put("driver", "com.mysql.jdbc.Driver")
// write dataframe to jdbc mysql
df.write.mode(SaveMode.Append).jdbc(url, "ToMySQL", prop)
//存储parquet文件
//dataFrame.coalesce(1).write.parquet(outputPath)
//结果输出成Json格式,输出到磁盘目录
//sql.coalesce(1).write.json(outputPath)
sc.stop()
}
}
二.工具类实现参数大于22
package com.utilsStu
class logmetadata(
sessionid: String,
advertisersid: Int,
adorderid: Int,
adcreativeid: Int,
adplatformproviderid: Int,
sdkversion: String,
adplatformkey: String,
putinmodeltype: Int,
requestmode: Int,
adprice: Double,
adppprice: Double,
requestdate: String,
ip: String,
appid: String,
appname: String,
uuid: String,
device: String,
client: Int,
osversion: String,
density: String,
pw: Int,
ph: Int,
long: String,
lat: String,
provincename: String,
cityname: String,
ispid: Int,
ispname: String,
networkmannerid: Int,
networkmannername: String,
iseffective: Int,
isbilling: Int,
adspacetype: Int,
adspacetypename: String,
devicetype: Int,
processnode: Int,
apptype: Int,
district: String,
paymode: Int,
isbid: Int,
bidprice: Double,
winprice: Double,
iswin: Int,
cur: String,
rate: Double,
cnywinprice: Double,
imei: String,
mac: String,
idfa: String,
openudid: String,
androidid: String,
rtbprovince: String,
rtbcity: String,
rtbdistrict: String,
rtbstreet: String,
storeurl: String,
realip: String,
isqualityapp: Int,
bidfloor: Double,
aw: Int,
ah: Int,
imeimd5: String,
macmd5: String,
idfamd5: String,
openudidmd5: String,
androididmd5: String,
imeisha1: String,
macsha1: String,
idfasha1: String,
openudidsha1: String,
androididsha1: String,
uuidunknow: String,
userid: String,
iptype: Int,
initbidprice: Double,
adpayment: Double,
agentrate: Double,
lomarkrate: Double,
adxrate: Double,
title: String,
keywords: String,
tagid: String,
callbackdate: String,
channelid: String,
mediatype: Int
)
extends Product with scala.Serializable{
override def canEqual(that: Any): Boolean = that.isInstanceOf[logmetadata]
//设置参数个数为85
override def productArity: Int = 85
//使用匹配模式实现参数的上限
override def toString(): String ={
s"logmetadata[${(for(i<- 0 until productArity) yield productElement(i) match{case Some(x)=>x case t=>t}).mkString(",")}]"
}
@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match {
case 0 => sessionid
case 1 => advertisersid
case 2 => adorderid
case 3 => adcreativeid
case 4 => adplatformproviderid
case 5 => sdkversion
case 6 => adplatformkey
case 7 => putinmodeltype
case 8 => requestmode
case 9 => adprice
case 10 => adppprice
case 11 => requestdate
case 12 => ip
case 13 => appid
case 14 => appname
case 15 => uuid
case 16 => device
case 17 => client
case 18 => osversion
case 19 => density
case 20 => pw
case 21 => ph
case 22 => long
case 23 => lat
case 24 => provincename
case 25 => cityname
case 26 => ispid
case 27 => ispname
case 28 => networkmannerid
case 29 => networkmannername
case 30 => iseffective
case 31 => isbilling
case 32 => adspacetype
case 33 => adspacetypename
case 34 => devicetype
case 35 => processnode
case 36 => apptype
case 37 => district
case 38 => paymode
case 39 => isbid
case 40 => bidprice
case 41 => winprice
case 42 => iswin
case 43 => cur
case 44 => rate
case 45 => cnywinprice
case 46 => imei
case 47 => mac
case 48 => idfa
case 49 => openudid
case 50 => androidid
case 51 => rtbprovince
case 52 => rtbcity
case 53 => rtbdistrict
case 54 => rtbstreet
case 55 => storeurl
case 56 => realip
case 57 => isqualityapp
case 58 => bidfloor
case 59 => aw
case 60 => ah
case 61 => imeimd5
case 62 => macmd5
case 63 => idfamd5
case 64 => openudidmd5
case 65 => androididmd5
case 66 => imeisha1
case 67 => macsha1
case 68 => idfasha1
case 69 => openudidsha1
case 70 => androididsha1
case 71 => uuidunknow
case 72 => userid
case 73 => iptype
case 74 => initbidprice
case 75 => adpayment
case 76 => agentrate
case 77 => lomarkrate
case 78 => adxrate
case 79 => title
case 80 => keywords
case 81 => tagid
case 82 => callbackdate
case 83 => channelid
case 84 => mediatype
case _ => throw new IndexOutOfBoundsException(n.toString())
}
}