Spark HiveServer2中使用jdbc客户端用户运行spark job
大致问题与这篇文章分析相同【[HiveServer2中使用jdbc客户端用户运行mapreduce](http://blog.javachen.com/2013/10/17/run-mapreduce-with-client-user-in-hive-server2.html)】,但因为是spark引擎,具体细节不同。spark hiveserver2 接收到链接请求,获取用户信息,会把用户信息存放到AccessControlContext中,后面很多地方获取调用UserGroupInformation.getCurrentUser()得到用户信息,但在spark excutor节点上获取不到,需要传递过去:
- insert sql解决方案:
修改InsertIntoHiveTable.saveAsHiveFile方法中:
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
修改后代码:
val user = UserGroupInformation.getCurrentUser if(user == null) { sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) } else { var inner = new DeliverLoginUser(UserGroupInformation.getCurrentUser.getUserName, writerContainer) sqlContext.sparkContext.runJob(rdd, inner.writeToFile _) } //定义DeliverLoginUser 类 import java.security.PrivilegedExceptionAction import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.{HiveUtils, SparkHiveWriterContainer} /** * Created by xxx on 2017/5/4. */ class DeliverLoginUser(userName: String, writerContainer: SparkHiveWriterContainer) extends Serializable with Logging { def writeToFile(@transient context: TaskContext, iterator: Iterator[InternalRow]): Unit = { val ugi = UserGroupInformation.createRemoteUser(userName) return ugi.doAs(new PrivilegedExceptionAction[Unit]() { override def run: Unit = { writerContainer.writeToFile(context, iterator) } }) } }
- create table ... as select sql解决方案:
修改InsertIntoHadoopFsRelationCommand.run方法中:
sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
修改后代码:
val user = UserGroupInformation.getCurrentUser if(user==null){ sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _) }else{ var inner = new DeliverLoginUser(UserGroupInformation.getCurrentUser.getUserName, writerContainer) sparkSession.sparkContext.runJob(queryExecution.toRdd, inner.writeRows _) } //定义DeliverLoginUser 类 package org.apache.spark.sql.execution.datasources import java.security.PrivilegedExceptionAction import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow /** * Created by xxx on 2017/5/4. */ class DeliverLoginUser(userName: String, writerContainer: BaseWriterContainer) extends Serializable with Logging{ def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { val ugi = UserGroupInformation.createRemoteUser(userName) return ugi.doAs(new PrivilegedExceptionAction[Unit]() { override def run: Unit = { writerContainer.writeRows(taskContext, iterator) } }) } }