一、SparkSQL整合Hive做数据源
1.官网上说需要把3个配文(core-site,hdfs-site,hive-site)放到程序类路径下。经测试本地运行需要给程序指明大数据的组件位置(hdfs、hive),在CDH上不需要设置到程序中,猜测应该是CDH的Spark自动把这3个配文放到类路径下,apache的没测过。在服务器上搜索core-site.xml: find / -name core-site.xml会发现spark2_on_yarn下有对应的3个配文。
2.创建SparkSession对象时需要进行2个设置,亲测,本地运行需要,CDH上不需要
val sparkSession = SparkSession. builder ( ) . config ( sparkConf)
. enableHiveSupport ( )
. getOrCreate ( )
3.本地运行还需要spark-hive的依赖,如果遇到一些报错,大概率也是jar缺失引起的,因为本地运行不像CDH那样都配好,hadoop的jar有可能会缺失。在CDH上不需要任何额外的jar,spark-hive也不需要
二、Spark程序连接Hive时进行kerberos认证
1.kerberos是通过3个文件来进行免密验证。在程序中读取3个文件,设置到Hadoop的Configuration对象中,然后使用hadoop.security包中提供的API进行是否建立hadoop连接的尝试。整个过程封装到1个方法中,只要方法执行时没抛异常,就表示当前节点得到了hadoop服务器的认证和授权,当前节点建立的连接被允许连接hive。
2.因为kerberos5分钟就失效,所以程序启动时就需要通过线程工厂创建1个校验线程,5分钟执行1次获取认证和授权方法。并且这个线程要设为守护线程,因为这个程序跑完就算执行完,不像web项目需要一直运行。整个过程封装到HiveAuthen类中,调用HiveAuthen构造器时就开启了校验线程。
package com. cib. dqms. auth;
import com. cib. dqms. core. util. thread. ScheduledThreadFactory;
import com. cib. dqms. utils. PropertiesUtil;
import org. apache. commons. lang. StringUtils;
import org. apache. hadoop. conf. Configuration;
import org. apache. hadoop. security. UserGroupInformation;
import java. io. IOException;
import java. util. concurrent. Executors;
import java. util. concurrent. ScheduledExecutorService;
import java. util. concurrent. TimeUnit;
public class HiveAuthen {
private ScheduledExecutorService scheduledExecutor = Executors. newScheduledThreadPool ( 1 , new ScheduledThreadFactory ( ) ) ;
public HiveAuthen ( ) {
krbAuth ( ) ;
scheduledExecutor. scheduleAtFixedRate ( new Runnable ( ) {
@Override
public void run ( ) {
krbAuth ( ) ;
}
} , 5 L, 5 L, TimeUnit. MINUTES) ;
}
public void krbAuth ( ) {
String krbConf = PropertiesUtil. getRequiredStringProperty ( "hive.krb.conf" ) ;
String krbKeyTab = PropertiesUtil. getRequiredStringProperty ( "hive.krb.key" ) ;
String krbPrincipal = PropertiesUtil. getRequiredStringProperty ( "hive.krb.principal" ) ;
if ( StringUtils. isEmpty ( krbConf) || StringUtils. isEmpty ( krbKeyTab) || StringUtils. isEmpty ( krbPrincipal) ) {
throw new RuntimeException ( "---------------------------kerbores认证文件不存在-------------------------" ) ;
}
System. setProperty ( "java.security.krb5.conf" , krbConf) ;
Configuration configuration = new Configuration ( ) ;
configuration. set ( "hadoop.security.authentication" , "kerberos" ) ;
configuration. set ( "keytab.file" , krbKeyTab) ;
configuration. setBoolean ( "hadoop.security.authorization" , true ) ;
configuration. set ( "kerberos.principal" , krbPrincipal) ;
try {
UserGroupInformation. setConfiguration ( configuration) ;
UserGroupInformation. loginUserFromKeytab ( krbPrincipal, krbKeyTab) ;
} catch ( IOException e) {
System. err. println ( e. getMessage ( ) ) ;
}
}
}
package com. cib. dqms. core. util. thread;
import java. util. concurrent. ThreadFactory;
import java. util. concurrent. atomic. AtomicInteger;
public class ScheduledThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger ( 1 ) ;
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger ( 1 ) ;
private final String namePrefix;
public ScheduledThreadFactory ( ) {
SecurityManager s = System. getSecurityManager ( ) ;
group = ( s != null) ? s. getThreadGroup ( ) : Thread. currentThread ( ) . getThreadGroup ( ) ;
namePrefix = "Scheduled Pool-" + poolNumber. getAndIncrement ( ) + "-Thead-" ;
}
@Override
public Thread newThread ( Runnable r) {
Thread t = new Thread ( group, r, namePrefix + threadNumber. getAndIncrement ( ) ) ;
if ( t. isDaemon ( ) ) {
t. setDaemon ( true ) ;
}
if ( t. getPriority ( ) != Thread. NORM_PRIORITY) {
t. setPriority ( Thread. NORM_PRIORITY) ;
}
return t;
}
}
object SparkWithPMMLAndDQMS {
def main ( args: Array[ String] ) : Unit = {
new HiveAuthen ( )
val sparkSession = SparkSession. builder ( )
. appName ( "SparkHive" )
. master ( "local" )
. enableHiveSupport ( )
. getOrCreate ( )
val df = sparkSession. sql ( "SELECT * FROM dctest.sogouq11 limit 10" )
df. printSchema ( )
df. show ( 10 , false )
. . . . . .
}
}