Spark应用基础--基础知识 常用样本代码

#创建spark项目
mvn archetype:generate
-DarchetypeGroupId=org.apache.maven.archetypes
-DgroupId=spark.examples
-DartifactId=JavaWordCount
-Dfilter=org.apache.maven.archetypes:maven-archetype-quickstart

  1. sparkSQL查询经典代码

df.select(countDistinct(“medallion”)).show()

//自定义函数并创造新的列,然后采样显示.
val code :(Timestamp => Long) = (arg: Timestamp) => { arg.getTime()/1000/3600%24;}
val addCol = udf(code)

val df22=df2.withColumn(“byhour”,addCol(df(“pickup_datetime”)))
df22.show(10)
val newSample = df22.sample(true, 0.001)

//把csv 行记录直接映射为case class 对象, case class 就相当于是schema
case class Employee(EmployeeID : String,
LastName : String, FirstName : String,
Title : String,
BirthDate : String, HireDate : String,
City : String, State : String, Zip : String, Country : String,
ReportsTo : String)

case class Order(OrderID : String,CustomerID : String, EmployeeID : String,OrderDate : String, ShipCountry : String)
case class OrderDetails(OrderID : String, ProductID : String, UnitPrice : Double, Qty : Int, Discount : Double)

val filePath = “hdfs://10.20.2.1:8020/study/spark/DataProcessingWithSpark2/”
val employees = spark.read.option(“header”,“true”).csv(filePath + “data/NW-Employees.csv”).as[Employee]
println(“Employees has “+employees.count()+” rows”)
employees.show(5)

//———————————

executor 内存大小建议至少4GB
–executor-memory 4g \

//随机取几个行
df.take(5)

//训练和测试集合存到一个数组里。
val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3))

//一次加载多个文件.
val df = spark.read.format(“csv”).option(“header”, “true”).option(“delimiter”, “,”).schema(tripFareSchema).load(“hdfs://10.20.2.1:8020/data/trip/*.csv”)

查看spark实例属性
http://spark-node4.cityworks.cn:4040/environment/

//自定义表头格式schema
import org.apache.spark.sql.types.{StructField, StructType, StringType,IntegerType,FloatType}
import org.apache.spark.sql.types.Metadata

val poiSchema = StructType(Array(
StructField(“shopID”, IntegerType, true),
StructField(“cityTelCode”, StringType, true),
StructField(“cityName”,StringType,true),
StructField(“name”,StringType,true),
StructField(“address”,StringType,true),
StructField(“tel”,StringType,true),
StructField(“latitude”,FloatType,true),
StructField(“longitude”,FloatType,true),
StructField(“channel”, StringType, true)
))

val df = spark.read.format(“csv”).option(“header”, “true”).option(“delimiter”, “|”).schema(poiSchema).load(“hdfs://10.20.2.1:8020/data/poi/ChinaPOI.csv”)

df.printSchema()
df.createOrReplaceTempView(“ChinaPOI”)

df.cache()
df.count()
df.show(10)

//coalesce(1) 是合并为一个文件的意思
jasperDF.coalesce(1).write.format(“csv”).option(“mode”, “OVERWRITE”).option(“path”, “hdfs://10.20.2.1:8020/tmp/jasper.csv”).save();

//spark 读文本按行解析
csv_lines = sc.textFile(“data/example.csv”)
data = csv_lines.map(lambda line: line.split(","))
data.collect()

#!/bin/bash
spark-submit --class com.linyingjie.study.wordcount.SparkWordCount
–master spark://10.20.2.31:7077,10.20.2.32:7077,10.20.2.33:7077,10.20.2.34:7077,10.20.2.35:7077
–deploy-mode cluster
–conf “spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark.properties”
–supervise
–driver-memory 1g
–executor-memory 1g
–executor-cores 1
–queue thequeue
–total-executor-cores 5
–driver-java-options “-Dlog4j.configuration=file:/var/server/spark/conf/log4j.properties -Dvm.logging.level=DEBUG”
–conf “spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/var/server/spark/conf/log4j.properties -Dvm.logging.name=myapp -Dvm.logging.level=DEBUG”
http://10.20.2.31:80/WordCount-assembly-0.2.jar
hdfs://10.20.2.1:8020/tmp/article.txt
hdfs://10.20.2.1:8020/tmp/result3__spark2.4.0_2018_11_21

Spark2.4.0 内置的Scala版本是 Scala 2.11.12
Spark2.3.1 内置的Scala版本是 Scala 2.11.8
这个通过spark-shell 查看即可知道,部署的scala 应用程序必须和目标spark 集群scala版本一致.
集群方式下提交:

#免密码登陆的条件下批量拷贝文件:
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t scp /etc/ntp.conf root@{}:/etc/ntp.conf
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh root@{} “systemctl restart ntpd;ntpdate -u 10.20.1.1; ntpq -p”

echo -e “hadoop-namenode1\nhadoop-namenode2\nhadoop-namenode3\n”>/tmp/namenodes.txt;cat /tmp/namenodes.txt | xargs -i -t scp /var/server/hbase/conf/hbase-site.xml hbase@{}:/var/server/hbase/conf/
ls

cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh root@{} “rm -rf /var/server/spark/logs/.
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh root@{} “chown -R spark:spark /var/server/spark/”

cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh root@{} “rm -rf /var/server/spark/logs/.
cat /var/server/spark/conf/slaves | grep ‘^spark-node’ | xargs -i -t ssh root@{} “chown -R spark:spark /var/server/spark/”

#echo -e “spark-node1\nspark-node2\nspark-node3\nspark-node4\nspark-node5\n”>/tmp/spark-slaves.txt;cat /tmp/spark-slaves.txt | xargs -i -t ssh root@{} “rm -rf /var/server/spark/logs/.

spark 执行任务的方式: 1. console 2.spark-sumit 直接从 shell 运行提交

#spark 集群重新启动
/var/server/spark/sbin/start-all.sh

spark 连接集群
#standalone 模式端口7077
spark-shell --master spark://192.168.2.41:7077
#集群模式下 用6066端口
source java8;spark-submit --class com.cloudera.datascience.lsa.RunLSA --master spark://spark-node1.cityworks.cn:6066,spark-node2.cityworks.cn:6066,spark-node3.cityworks.cn:6066,spark-node4.cityworks.cn:6066,spark-node5.cityworks.cn:6066 --deploy-mode cluster --driver-memory 4g --executor-memory 4g --executor-cores 4 --queue thequeue ch06-lsa-2.0.0-jar-with-dependencies.jar

spark-shell --master spark://10.20.2.21:7077,10.20.2.22:7077,10.20.2.23:7077

source java8;spark-shell --master spark://10.20.2.31:6066,10.20.2.32:6066,10.20.2.33:6066,10.20.2.34:6066,10.20.2.35:6066

source java8;spark-submit --class com.cloudera.datascience.lsa.RunLSA --master spark://spark-node1.cityworks.cn:6066,spark-node2.cityworks.cn:6066,spark-node3.cityworks.cn:6066,spark-node4.cityworks.cn:6066,spark-node5.cityworks.cn:6066 --deploy-mode cluster --driver-memory 4g --executor-memory 4g --executor-cores 4 --queue thequeue ch06-lsa-2.0.0-jar-with-dependencies.jar

val df = spark.read.format(“json”).load(“hdfs://10.20.2.3:8020/study/spark/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json”)
//#我的配置是8020 标准配置

val df = spark.read.format(“csv”).option(“header”, “true”).option(“inferSchema”, “true”).load(“hdfs://10.20.2.3:8020/study/spark/Spark-The-Definitive-Guide/data/retail-data/by-day/2010-12-01.csv”)
df.printSchema()
df.createOrReplaceTempView(“dfTable”)

提交Spark 任务
将Spark应用程序捆绑为jar文件(用Scala或Java编写)或Python文件后,可以使用位于Spark分发(即$ SPARK_HOME / bin)bin目录下的Spark-submit脚本提交。 根据Spark网站提供的API文档(http://spark.apache.org/docs/latest/submitting-applications.h
tml),该脚本负责以下事项:
使用Spark设置JAVA_HOME,SCALA_HOME的类路径
设置执行作业所需的所有依赖项
管理不同的集群管理器
最后,部署Spark支持的模型
简而言之,Spark作业提交语法如下:
spark-submit [options] <app-jar | python-file> [app arguments]

//注册自定义函数:

Now that you have an understanding of the process, let’s work through an example. First, we
need to register the function to make it available as a DataFrame function:
// in Scala
import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)
We can use that just like any other DataFrame function:
// in Scala
udfExampleDF.select(power3udf(col(“num”))).show()

//方式2:
spark.udf.register(“power3”, power3(_:Double):Double)
udfExampleDF.selectExpr(“power3(num)”).show(2)

scala> sc.[\t]
scala> sc .
accumulable defaultParallelism jars sequenceFile
accumulableCollection deployMode killExecutor setCallSite
accumulator doubleAccumulator killExecutors setCheckpointDir
addFile emptyRDD killTaskAttempt setJobDescription
addJar files listFiles setJobGroup]/
\\\\\\\\\]/

.
//sc 和spark 是2个系统内置的变量 sc 代表SparkContext, spark 代表SparkSession
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@23469199

scala> spark
res1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5c4714ef

scala>:help //查看命令使用方式

scala>:history 或 ?h // 查看之前使用的命令

#多JDK 环境的启动方式
source java8;spark-shell

#UI节面 ( spark-shell) 启动后就有
http://localhost:4040/jobs/

Extract, Transform, and Load (ETL)
Schema-on-Read (SOR)

spark-shell 贴代码:

scala> :paste
然后贴代码。。
ctl+D 退出 粘贴模式

Resilient Distributed Datasets (RDDs)

猜你喜欢

转载自blog.csdn.net/happyfreeangel/article/details/84580867