本地读取
scala> var textFile = sc.textFile("file:///root/1.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///root/1.txt MapPartitionsRDD[57] at textFile at <console>:24
scala> textFile.saveAsTextFile("file:///root/writeback")
scala> textFile.foreach(println)
hadoop hello
bianhao shan
nihao
hello shan
hello bianhao
nihao
lizhao hello
json文件读取以及解析
json文件读取
############ 在hdfs读取json文件也是同理的 ###########
scala> val jsonStr = sc.textFile("file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json")
jsonStr: org.apache.spark.rdd.RDD[String] = file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json MapPartitionsRDD[65] at textFile at <console>:24
scala> jsonStr.foreach
foreach foreachAsync foreachPartition foreachPartitionAsync
scala> jsonStr.foreach(println)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
json解析
下面用一个实例来实战学习
其中sbt打包详细内容见 子雨老师实验室的教程,这里仅直接操作(实战发现sbt成功一次之后,只要你scala和spark版本不变–也就是simple.sbt中的组件不改变,那么二次可以不联网)
############### 建立工程文件 ###################
[root@master ~]# pwd
/root
[root@master ~]# cd code
[root@master code]# ls
scala
[root@master code]# mkdir spark
[root@master code]# cd spark/ && mkdir json && cd j*
[root@master json]# ls
[root@master json]# pwd
/root/code/spark/json
[root@master json]# mkdir -p src/main/scala && cd src/main/scala
[root@master scala]# vim testjson.scala
[root@master scala]# cat testjson.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
object JSONApp {
def main(args: Array[String]) {
val inputFile = "file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json"
val conf = new SparkConf().setAppName("JSONApp")
val sc = new SparkContext(conf)
val jsonStrs = sc.textFile(inputFile)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach( {r => r match {
case Some(map: Map[String, Any]) => println(map)
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
}
}
)
}
}
[root@master scala]# cd ../../..
[root@master json]# vim simple.sbt
[root@master json]# find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/testjson.scala
[root@master json]# cat simple.sbt
name := "JSON Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
##################### sbt打包 ###################
[root@master json]# # 请一定把这目录设置为当前目录
[root@master json]# /usr/local/soft/mysbt2/sbt package
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Updated file /root/code/spark/json/project/build.properties: set sbt.version to 1.3.4
[info] Loading project definition from /root/code/spark/json/project
[info] Loading settings for project json from simple.sbt ...
[info] Set current project to JSON Project (in build file:/root/code/spark/json/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /root/code/spark/json/target/scala-2.11/classes ...
[warn] /root/code/spark/json/src/main/scala/testjson.scala:14:40: non-variable type argument String in type pattern scala.collection.immutable.Map[String,Any] (the underlying of Map[String,Any]) is unchecked since it is eliminated by erasure
[warn] case Some(map: Map[String, Any]) => println(map)
[warn] ^
[warn] one warning found
[success] Total time: 13 s, completed 2019-12-4 23:25:27
#################### spark-submit提交 #######################
[root@master json]# spark-submit --class "JSONApp" /root/code/spark/json/target/scala-2.11/json-project_2.11-1.0.jar 2>&1 | grep "Map("
Map(name -> Michael)
Map(name -> Andy, age -> 30.0)
Map(name -> Justin, age -> 19.0)
hdfs读取
scala> var textFile = sc.textFile("hdfs://192.168.0.11:9000/user/root/1.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.0.11:9000/user/root/1.txt MapPartitionsRDD[62] at textFile at <console>:24
scala> textFile.first()
res25: String = hadoop hello
scala> textFile.saveAsTextFile("write")
HBase
是Google Bigtable(大表)的开源实现
数据模型
时间戳自动生成,不用自己去管,每次操作的都是最新时间戳的,老版本还在,因为HDFS一次写入之后,再也无法修改的特性导致的(Hbase基于HDFS)
所以我们用的时候只要确定 行键,列族,列限定符 三个维度的量就行了
一个表可以存在于多个分区中(因为表很大)
HBase安装配置
下载速度真滴慢…后面发现自己曾经在某个老师那里拷贝了大数据软件合集压缩包…于是乎,就直接用HBase1.1.5安装了
写了自己的安装教程
HBase1.1.x部署在Hadoop2.6.0上(3台vm虚拟机的部分分布式)
HBase Shell槽点
输入竟然不能backspace向前删除,只能使用backspace和del都是向后删除
HBase操作
可以直接阅读这篇文章
读写HBase数据
其中实例代码讲解看了 子雨老师的网课
[root@master scala-2.11]# spark-submit --driver-class-path /usr/local/soft/spark-2.1.0-bin-without-hadoop/jars/hbase/*:/usr/local/soft/hbase/conf --class "SparkOperateHBase" ~/code/spark/hbase/target/scala-2.11/hbase-project_2.11-1.0.jar 2>&1 | grep "Row key:"
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24