因需要通过任务的方式执行将文本的数据导入到HIVE数据库中。所以考虑每次的任务通过命令执行py文件的方式来实现定时导入文件到HIVE数据库中。所以先做了一个测试,但是出行一些问题,现记录下来,供以后查看。
测试脚本如下:
# -*- coding: utf-8 -*-
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql import Row
if __name__ == "__main__":
# 初始化SparkSession
spark = SparkSession \
.builder \
.appName("CardInfroToHive_kf") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
lines = sc.textFile("hdfs://10.250.11.52:8020/source/db/cardinfo/20180507")
parts = lines.map(lambda l: l.split(","))
Cardinfo = parts.map(lambda p: Row(LCN=p[0],IS_TIME=p[5],IS_TYPE=p[6],IS_AREA=p[8],CARD_TYPE=p[11],PURSE_TYPE=p[12],CARD_MARK=p[13]))
#RDD转换成DataFrame
Cardinfo_temp = spark.createDataFrame(Cardinfo)
#创建视图
Cardinfo_temp.createOrReplaceTempView("t_Cardinfo")
#过滤数据
#CREATE HIVE TABLE
spark.sql("use oracledb ")
spark.sql("CREATE TABLE IF NOT EXISTS t_lnt_basic_cardinfo_kf (LCN STRING, IS_TIME STRING,IS_TYPE STRING,IS_AREA STRING,CARD_TYPE STRING,PURSE_TYPE STRING,CARD_MARK STRING) USING hive")spark.sql("insert into table t_lnt_basic_cardinfo_st SELECT LCN,IS_TIME, IS_TYPE, IS_AREA,CARD_TYPE ,PURSE_TYPE,CARD_MARK FROM t_Cardinfo_st ")
保存文件名:cardinfo.py
执行:python3 cardinfo.py 或者spark-submit cardinof.py
出现问题:
1.执行spark.sql("use oracledb")脚本的时候,出现:
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'oracledb' not found;
的错误提示。
将脚本拷贝到pyspark的命令模式,所有脚本执行正常。
解决方法:
经过百度,参考传送门:https://blog.csdn.net/lxhandlbb/article/details/56293490
在开头的地方引用HIVE的支持,具体如下,添加粗体部分。
# 初始化SparkSession
spark = SparkSession \
.builder.enableHiveSupport()\
.appName("CardInfroToHive_kf") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()