前言
默认情况下,spark sql,删除外部表分区时,只会删除元数据,不会删除hdfs上数据,删除内部表分区时,不仅会同时删除元数据和hdfs上数据。
需求
现做如下定制,无论是内部表还是外部表,默认情况为只删除元数据,不会删除hdfs上数据,若 带 with data 选项,则同时删除元数据和hdfs 上数据。
修改源码
修改 SqlBase.g4源文件
修改 SqlBase.g4,如下:
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* (WITH DATA)? PURGE? #dropTablePartitions | ALTER VIEW tableIdentifier DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* (WITH DATA)? #dropTablePartitions
修改SparkSqlParser.scala源文件
修改SparkSqlParser.scala源文件中的visitDropTablePartitions 方法,如下:
override def visitDropTablePartitions( ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) { if (ctx.VIEW != null) { operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx) } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, !(ctx.WITH() != null && ctx.DATA() != null)) // 默认为false,不保存数据 }
修改HiveClientImpl.scala源文件
修改org.apache.spark.sql.hive.client.HiveClientImpl#dropPartitions 方法如下:
override def dropPartitions( db: String, table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) // do the check at first and collect all the matching partitions val matchingParts = specs.flatMap { s:TablePartitionSpec => assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") // The provided spec here can be a partial spec, i.e. it will match all partitions // whose specs are supersets of this partial spec. E.g. If a table has partitions // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. val parts: mutable.Buffer[HivePartition] = client.getPartitions(hiveTable, s.asJava).asScala parts.map(partition => { partition.getDataLocation }) if (parts.isEmpty && !ignoreIfNotExists) { throw new AnalysisException( s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + s"database '$db'") } val tuples: mutable.Buffer[(util.List[String], Path)] = parts.map(partition => { val partitionLoc: Path = partition.getDataLocation (partition.getValues,partitionLoc) }) tuples }.distinct matchingParts.foreach(tuple => logInfo(s"${tuple._1} : ${tuple._2}")) var droppedParts = ArrayBuffer.empty[(java.util.List[String],Path)] val fileSystem: FileSystem = FileSystem.get(conf) matchingParts.foreach { tuple => val partition = tuple._1 val path = tuple._2 try {// 默认保留数据 shim.dropPartition(client, db, table, partition, !retainData, purge) // 添加删除hdfs上数据逻辑 if (!retainData && fileSystem.exists(path)) { if(fileSystem.delete(path, true)) { logInfo(s"删除hdfs上数据: ${path}") } } } catch { case e: Exception => val remainingParts = matchingParts.toBuffer -- droppedParts logError( s""" |====================== |Attempt to drop the partition specs in table '$table' database '$db': |${specs.mkString("\n")} |In this attempt, the following partitions have been dropped successfully: |${droppedParts.mkString("\n")} |The remaining partitions have not been dropped: |${remainingParts.mkString("\n")} |====================== """.stripMargin) throw e } droppedParts += tuple } }
编译打包配置
略
测试
内部表
如下,ext2是一个内部表
# 显示所有分区信息
spark-sql> show partitions ext2; dt=1/last_dt=1 dt=1/last_dt=2 dt=1/last_dt=3 dt=2/last_dt=1 dt=2/last_dt=2 dt=2/last_dt=3 dt=3/last_dt=1 dt=3/last_dt=2 dt=3/last_dt=3 # 删除表分区元数据 + hdfs 上数据 spark-sql> alter table ext2 drop partition(dt=1), partition(dt=2) with data;
# 显示分区 spark-sql> show partitions ext2; dt=3/last_dt=1 dt=3/last_dt=2 dt=3/last_dt=3 # 只删除元数据,不删除hdfs上数据 spark-sql> alter table ext2 drop partition(dt=3); # 根据表hdfs数据恢复分区元数据 spark-sql> msck repair table ext2; spark-sql> show partitions ext2; dt=3/last_dt=1 dt=3/last_dt=2 dt=3/last_dt=3 # 查看hdfs上数据 [root@node01 ~]# hdfs dfs -ls -R /user/hive/warehouse/test.db/ext2 drwxr-xr-x - root supergroup 0 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3 drwxr-xr-x - root supergroup 0 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3/last_dt=1 -rwxr-xr-x 3 root supergroup 16 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3/last_dt=1/part-00000-490c18ff-1a7a-49f9-b07b-fbadf8e08158-c000 drwxr-xr-x - root supergroup 0 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3/last_dt=2 -rwxr-xr-x 3 root supergroup 16 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3/last_dt=2/part-00000-f64a89a2-13a7-4ac9-bd65-230fc5963bda-c000 drwxr-xr-x - root supergroup 0 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3/last_dt=3 -rwxr-xr-x 3 root supergroup 16 2020-02-29 14:08 /user/hive/warehouse/test.db/ext2/dt=3/last_dt=3/part-00000-08808e43-c98b-470d-b0c8-150b3fb7294a-c000
测试成功
外部表
修改ext2为外部表
hive> alter table ext2 set TBLPROPERTIES('EXTERNAL'='true');
测试
# 显示所有分区 spark-sql> show partitions ext2; dt=3/last_dt=1 dt=3/last_dt=2 dt=3/last_dt=3 # 删除分区元数据,不删除数据 spark-sql> alter table ext2 drop partition(dt=3), partition(dt=3,last_dt=1); # 根据hdfs上数据恢复表元数据 spark-sql> msck repair table ext2; # 显示表分区信息 spark-sql> show partitions ext2; dt=3/last_dt=1 dt=3/last_dt=2 dt=3/last_dt=3 # 删除分区元数据 + hdfs 上数据 spark-sql> alter table ext2 drop partition(dt=3), partition(dt=3,last_dt=1) with data; # 根据hdfs数据恢复表元数据 spark-sql> msck repair table ext2; # 展示分区 spark-sql> show partitions ext2; # 查看 hdfs 上数据 [root@node01 ~]# hdfs dfs -ls -R /user/hive/warehouse/test.db/ext2 You have new mail in /var/spool/mail/root [root@node01 ~]#
至此,定制SQL成功。