Spark HiveServer2中使用jdbc客户端用户运行spark job

Spark HiveServer2中使用jdbc客户端用户运行spark job

大致问题与这篇文章分析相同【[HiveServer2中使用jdbc客户端用户运行mapreduce](http://blog.javachen.com/2013/10/17/run-mapreduce-with-client-user-in-hive-server2.html)】,但因为是spark引擎,具体细节不同。spark hiveserver2 接收到链接请求,获取用户信息,会把用户信息存放到AccessControlContext中,后面很多地方获取调用UserGroupInformation.getCurrentUser()得到用户信息,但在spark excutor节点上获取不到,需要传递过去:

  • insert sql解决方案:

修改InsertIntoHiveTable.saveAsHiveFile方法中:

sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)

修改后代码:

    val user = UserGroupInformation.getCurrentUser
    if(user == null) {
      sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
    } else {
      var inner = new DeliverLoginUser(UserGroupInformation.getCurrentUser.getUserName, writerContainer)
      sqlContext.sparkContext.runJob(rdd, inner.writeToFile _)
    }

//定义DeliverLoginUser 类
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hive.{HiveUtils, SparkHiveWriterContainer}

/**
  * Created by xxx on 2017/5/4.
  */
class DeliverLoginUser(userName: String, writerContainer: SparkHiveWriterContainer)  extends Serializable with Logging {
  def writeToFile(@transient context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
    val ugi = UserGroupInformation.createRemoteUser(userName)
    return ugi.doAs(new PrivilegedExceptionAction[Unit]() {
      override def run: Unit = {
        writerContainer.writeToFile(context, iterator)
      }
    })
  }
}
  •  create table ... as select sql解决方案:

修改InsertIntoHadoopFsRelationCommand.run方法中:

sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)

修改后代码:

val user = UserGroupInformation.getCurrentUser
if(user==null){
  sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
}else{
  var inner = new DeliverLoginUser(UserGroupInformation.getCurrentUser.getUserName, writerContainer)
  sparkSession.sparkContext.runJob(queryExecution.toRdd, inner.writeRows _)
}

//定义DeliverLoginUser 类
package org.apache.spark.sql.execution.datasources

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow

/**
  * Created by xxx on 2017/5/4.
  */
class DeliverLoginUser(userName: String, writerContainer: BaseWriterContainer)
  extends Serializable with Logging{

  def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
    val ugi = UserGroupInformation.createRemoteUser(userName)
    return ugi.doAs(new PrivilegedExceptionAction[Unit]() {
      override def run: Unit = {
        writerContainer.writeRows(taskContext, iterator)
      }
    })
  }
}

猜你喜欢

转载自melin.iteye.com/blog/2372744