在使用Spark-Sql 时,需要把RDD类型转换为DataFrame,再使用一些SQL操作,在转换为DataFrame时有两种方式一种是通过反射方式,一种是通过编程接口方式
编程接口的方式比较常用,但是这种方式代码量可能比较大,特别是在你的字段特别多的时候,你需要先把RDD中的类型转换为Row,还有根据每个字段的不同对其进行类型转换,还要再创建元数据构建StructType,特别的麻烦,因此我就写了两个工具类,可以在字段多的时候使用,对于字段少就没这个必要啦,下面就是代码,可以根据自己的需求修改使用
import org.apache.spark.sql.Row
object RowUtils {
//把一行数据进行切分后的到一个数组,数组中的每一个值都对应一个字段,根据对应字段对应类型不同构建Row
def createRow(t :Array[String]): Row={
var row = Row()
for (i <- 0 to t.length - 1) {
//这里只需要根据哪些字段是Int|Double|String修改对应的下标即可
//数组中 类型为整型的 下标
if(i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28||i==30||i==31||i==32
||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) {
row = Row.merge(row,Row(t(i).toInt))
}
// 类型中为Double类型的 下标
else if(i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78){
row = Row.merge(row,Row(t(i).toDouble))
}
// 其余的String类型的
else row = Row.merge(row,Row(t(i)))
}
row
}
}
package utils
import org.apache.spark.sql.types.StructField
import scala.io.Source
object Schema1 {
def getSchema(): List[StructField] ={
var schema:List[StructField] = List[StructField]()
val lines: Iterator[String] = Source.fromFile("D:\\fields.txt","gb2312").getLines()
val list = lines.toList
for (i <- 0 until list.length){
/*
*essionid: String, 会话标识
*dvertisersid: Int, 广告主 id
*dorderid: Int, 广告 id
*文件中的格式向上面一样
*可以根据自己的字段的格式进行切割,只需要修改.split()中的切分的字符和 取第几个值就好啦
*/
val split = list(i).split(":")
//那个字段是int类型,那几个字段是Double类型、String类型
if (i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28||i==30||i==31||i==32
||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) {
///schema = schema++ List(StructField(split(0),IntegerType,true))
//取每行的第1个是 想要的字段
schema = schema ::: List(StructField(split(0),IntegerType,true))
} else if (i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78) {
//schema =schema++ List(StructField(split(0),DoubleType,true))
schema = schema ::: List(StructField(split(0),DoubleType,true))
} else {
//schema= schema++List(StructField(split(0),StringType,true))
schema= schema ::: List(StructField(split(0),StringType,true))
}
}
schema
}
}