【spark】Spark Session 读取csv文件、修改无列名文件名

处理有列名文件

-- 处理有列表名的文件

scala> val cus = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.122:9000/20200107/cust.csv")


scala> cus.printSchema
root
 |-- userid: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- cardno: string (nullable = true)
 |-- address: string (nullable = true)
 |-- area: string (nullable = true)
 |-- city: string (nullable = true)
 |-- language: string (nullable = true)
 |-- scores: string (nullable = true)

// 创建临时表 相当于取表名
cus.registerTempTable("users")
spark.sql("select * from users").show(3)

处理无列名文件

val cs = spark.read.format("csv").load("hdfs://192.168.56.122:9000/20200107/customers.csv")
scala> cs.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
// 改单个列名
scala> cs.withColumnRenamed("_c0","id")
res53: org.apache.spark.sql.DataFrame = [id: string, _c1: string ... 7 more fields]
// 多个修改
scala> val lku = Map("_c0"->"id","_c1"->"lname")
lku: scala.collection.immutable.Map[String,String] = Map(_c0 -> id, _c1 -> lname)

scala> cs.select(cs.columns.map(c=>col(c).as(lku.getOrElse(c,c))): _*)
res54: org.apache.spark.sql.DataFrame = [id: string, lname: string ... 7 more fields]

发布了94 篇原创文章 · 获赞 110 · 访问量 5025

猜你喜欢

转载自blog.csdn.net/beautiful_huang/article/details/104198824