【Spark】Spark SQL原理、编译、配置及运行方式详述

1、Spark SQL的发展历史

(1)在Hadoop中运行SQL的工具
在Hadoop中运行SQL的工具有Hive、Impala、Apache Drill、Presto、Spark SQL等。
(2)Shark的发展历史
在三四年前,Hive可以说是SQL on Hadoop的唯一选择,负责将SQL编译成可扩展的MapReduce作业。鉴于Hive的性能以及与Spark的兼容,Shark项目由此而生。
Shark即Hive on Spark,本质上是通过Hive的HQL解析,把HQL翻译成Spark上的RDD操作,然后通过Hive的metadata获取数据库里的表信息,实际HDFS上的数据和文件,会由Shark获取并放到Spark上运算。
Shark的最大特性就是快和与Hive的完全兼容,且可以在shell模式下使用rdd2sql()这样的API,把HQL得到的结果集,继续在scala环境下运算,支持自己编写简单的机器学习或简单分析处理函数,对HQL结果进一步分析计算。
Shark项目源代码:https://github.com/amplab/shark。
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。
Databricks表示,Spark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级。
Databricks推广的Shark相关项目一共有两个,分别是Spark SQL和新的Hive on Spark(HIVE-7292)。
Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。
(3)Spark SQL发展的时间线

(4)Spark SQL的特点
1)Spark SQL运行SQL或HiveQL查询使用UDF,UDAF和SerDes(序列化反序列化)函数。
2)通过JDBC或ODBC将Tableau等连接到Spark SQL。
3)使用Python、Scala、Java和R语言开发。

2、SparkSQL编译

编译Spark时指定支持Hive:(Apache Hadoop 2.4.x支持Hive 1.3)

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package

3、SparkSQL和Hive集成(SparkSQL需要的是Hive表的元数据)

(1)将hive的hive-site.xml文件复制或者软链接到spark的conf文件夹中。

~]$ cd /opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/conf
conf]$ ln -s /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/conf/hive-site.xml
conf]$ ll
lrwxrwxrwx 1 beifeng beifeng   54 Jul 28 18:31 hive-site.xml -> /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/conf/hive-site.xml

(2)/opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/conf/hive-site.xml文件内容如下。

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
      <property>
             <name>javax.jdo.option.ConnectionURL</name>
             <value>jdbc:mysql://bigdata-senior.ibeifeng.com:3306/metadata?createDatabaseIfNotExist=true</value>
      </property>
      <property>
             <name>javax.jdo.option.ConnectionDriverName</name>
             <value>com.mysql.jdbc.Driver</value>
      </property>
      <property>
             <name>javax.jdo.option.ConnectionUserName</name>
             <value>root</value>
      </property>
      <property>
             <name>javax.jdo.option.ConnectionPassword</name>
             <value>123456</value>
      </property>
      <property>
             <name>hive.metastore.warehouse.dir</name>
             <value>/user/hive/warehouse</value>
      </property>
      <property>
             <name>hive.exec.mode.local.auto</name>
             <value>true</value>
      </property>
      <property>
             <name>hive.exec.mode.local.auto.input.files.max</name>
             <value>100</value>
      </property>
      <property>
             <name>hive.exec.mode.local.auto.inputbytes.max</name>
             <value>13421772800000</value>
      </property>
      <property>
             <name>hive.cli.print.header</name>
             <value>true</value>
      </property>
      <property>
             <name>hive.cli.print.current.db</name>
             <value>true</value>
      </property>
      <!-- HiveServer2 -->
      <property>
             <name>hive.server2.thrift.port</name>
             <value>10000</value>
      </property>
      <property>
             <name>hive.server2.thrift.bind.host</name>
             <value>bigdata-senior.ibeifeng.com</value>
      </property>
      <!-- Remote MetaStore -->
      <property>
             <name>hive.metastore.uris</name>
             <value>thrift://bigdata-senior.ibeifeng.com:9083</value>
      </property>
</configuration>

(3)根据hive的配置文件的内容选择不同的操作方式,这里指根据hive.metastore.uris参数的配置值来选择不同的操作方式,值默认为空。
1)如果没有给定参数(默认情况):
将hive元数据数据库的驱动包添加到spark的classpath环境变量中即可完成spark和hive的集成。
2)给定具体的metastore服务所在的节点信息(值非空)
启动hive的metastore服务:

cd /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6
hive-0.13.1-cdh5.3.6]$ bin/hive --service metastore &
[1] 5146
[beifeng@bigdata-senior hive-0.13.1-cdh5.3.6]$ Starting Hive Metastore Server
[beifeng@bigdata-senior hive-0.13.1-cdh5.3.6]$ jps -ml
5146 org.apache.hadoop.util.RunJar /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/hive-service-0.13.1-cdh5.3.6.jar org.apache.hadoop.hive.metastore.HiveMetaStore
5225 sun.tools.jps.Jps -ml
[beifeng@bigdata-senior hive-0.13.1-cdh5.3.6]$ jps
5146 RunJar
5235 Jps

(4)SparkSQL和Hive集成测试
1)启动NameNode,DataNode
2)启动SparkSQL

cd /opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6
spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-sql

spark-sql (default)> select * from common.emp;
spark-sql (default)> select * from common.emp a join common.dept b on a.deptno = b.deptno; 
spark-sql (default)> explain select * from common.emp a join common.dept b on a.deptno = b.deptno;

3)启动Spark Shell

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell
scala> sqlContext.sql("select * from common.emp a join common.dept b on a.deptno = b.deptno").show()

4)查看SparkSQL和Spark Shell界面

SparkSQL: http://bigdata-senior.ibeifeng.com:4040/jobs/
Spark Shell: http://bigdata-senior.ibeifeng.com:4041/jobs/

(5)Spark应用依赖第三方jar文件解决方案
1)使用参数–jars添加本地的第三方jar文件,可以给定多个,使用逗号分隔。注意:要求jar文件在driver和client的机器上存在,适合依赖比较少的情况,即jar文件在本地存在。

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --jars /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar,/opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/derby-10.10.1.1.jar

在Spark Shell界面:http://bigdata-senior.ibeifeng.com:4041/jobs/ 查看添加的jar文件。


2)使用参数–packages添加maven中央库中的第三方jar文件,可以给定多个,使用逗号分隔。注意:下载的jar文件会保存到当前用户的根目录下的.ivy2文件夹的jars文件夹中,eg:/home/beifeng/.ivy2/jars;适合依赖比较少的情况。jar文件从maven源下载。

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --packages mysql:mysql-connector-java:5.1.27

3)使用SPARK_CLASSPATH环境变量给定jar文件的位置信息。注意:要求所有可以执行的节点都需要进行该配置,如果是spark on yarn,要求所有机器上有对应文件夹的jar文件。

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ mkdir -p external_jars
即该位置:/opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/external_jars

SPARK_CLASSPATH=/opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/external_jars/*
external_jars]$ cp /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar .
external_jars]$ cp /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/derby-10.10.1.1.jar .

启动Spark Shell

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell
http://bigdata-senior.ibeifeng.com:4042/environment/

jar文件明确给定路径信息:

4)将依赖的jar文件打包到spark应用的jar文件中。注意:只适合jar文件比较小,而且应用依赖的jar文件不多的情况。
5)Spark on Yarn Cluster 第三方jar文件驱动解决方案(推荐)
将第三方的jar文件copy到${HADOOP_HOME}/share/hadoop/common/lib文件夹中或者${HADOOP_HOME}/share/hadoop/yarn/lib文件夹中。

4、SparkSQL源码

在SparkSQL源码中,SQLContext是SparkSQL的入口,依赖于SparkContext。
HiveContext:当SparkSQL和Hive集成的时候(也就是SparkSQL可以访问Hive的元数据),必须使用HiveContext作为SparkSQL的入口。
DataFrame:SparkSQL中的核心抽象,类似于RDD,都是分布式数据集。

5、SparkSQL的ThriftServer服务

(1)SparkSQL的ThriftServer服务简述
SparkSQL的ThriftServer服务其实就是Hive的HiveServer2服务,只是将底层的执行改成Spark,同时在Spark上启动。
SparkSQL的ThriftServer服务官方帮助文档:
https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2
http://spark.apache.org/docs/1.6.1/sql-programming-guide.html#distributed-sql-engine
(2)配置SparkSQL的ThriftServer服务
1)在hive-site.xml中修改Hiveserver2的配置信息,如端口号和监听的IP地址:

hive.server2.thrift.port=10000
hive.server2.thrift.host=0.0.0.0

2)启动ThriftServer服务
spark-1.6.1-bin-2.5.0-cdh5.3.6]$ sbin/start-thriftserver.sh
查看启动进程:

[beifeng@bigdata-senior spark-1.6.1-bin-2.5.0-cdh5.3.6]$ jps -ml
5146 org.apache.hadoop.util.RunJar /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/hive-service-0.13.1-cdh5.3.6.jar org.apache.hadoop.hive.metastore.HiveMetaStore
8701 sun.tools.jps.Jps -ml
5616 org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver spark-internal
5544 org.apache.hadoop.hdfs.server.datanode.DataNode
6668 org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell --jars /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.27-bin.jar,/opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/lib/derby-10.10.1.1.jar spark-shell
8555 org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal
5457 org.apache.hadoop.hdfs.server.namenode.NameNode

注意:ThriftServer服务也是一个Spark的应用,所以可以给定参数,也可以进行优化(主要是资源调优)
3)停止ThriftServer服务
spark-1.6.1-bin-2.5.0-cdh5.3.6]$ sbin/stop-thriftserver.sh
(3)SparkSQL的ThriftServer服务测试
1)通过命令beeline来测试

spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/beeline
Beeline version 1.6.1 by Apache Hive
beeline> !connect jdbc:hive2://bigdata-senior.ibeifeng.com:10000
Connecting to jdbc:hive2://bigdata-senior.ibeifeng.com:10000
Enter username for jdbc:hive2://bigdata-senior.ibeifeng.com:10000: beifeng
Enter password for jdbc:hive2://bigdata-senior.ibeifeng.com:10000: *******
19/07/28 23:15:29 INFO jdbc.Utils: Supplied authorities: bigdata-senior.ibeifeng.com:10000
19/07/28 23:15:29 INFO jdbc.Utils: Resolved authority: bigdata-senior.ibeifeng.com:10000
19/07/28 23:15:29 INFO jdbc.HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://bigdata-senior.ibeifeng.com:10000
Connected to: Spark SQL (version 1.6.1)
Driver: Spark Project Core (version 1.6.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ

在beeline命令行输入SQL语句:

0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> 
0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> show databases;
0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> use default;
0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> show tables;
0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> select * from default.emp a join default.dept b on a.deptno = b.deptno;
0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> !help     # 查看帮助信息
0: jdbc:hive2://bigdata-senior.ibeifeng.com:1> !quit
Closing: 0: jdbc:hive2://bigdata-senior.ibeifeng.com:10000

2)编码测试,通过JDBC连接SparkSQL提供的ThriftServer服务
添加hive-jdbc驱动的pom依赖。这个依赖最好使用aliyun的maven源进行下载,需要检查一下是否已配置该maven源:http://maven.aliyun.com/nexus/content/groups/public/。
maven源配置如下:

<repositories>
	<repository>
		<id>aliyun</id>
		<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
	</repository>
	<repository>
		<id>cloudera</id>
		<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
	</repository>
</repositories>

pom依赖坐标如下:

<dependency>
  <groupId>org.spark-project.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>0.13.1</version>
</dependency>

注意:给定用户名的时候,如果HDFS没有做权限验证,可以给定任何值;如果做了权限验证,必须给定操作tmp文件夹有权限的用户名称。

6、SparkSQL读取JSON格式的HDFS文件

1)将测试文件上传HDFS上的/beifeng/spark/sql/data目录。

hadoop-2.5.0-cdh5.3.6]$ pwd
/opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6
hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -mkdir -p /user/beifeng/spark/sql/data
hadoop-2.5.0-cdh5.3.6]$ cd /opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0-cdh5.3.6/examples/src/main/resources
resources]$ /opt/cdh-5.3.6/hadoop-2.5.0-cdh5.3.6/bin/hdfs dfs -put ./* /user/beifeng/spark/sql/data

2)编写SparkSQL代码
启动Hive Metastore:hive-0.13.1-cdh5.3.6]$ bin/hive --service metastore
启动Spark-Shell:spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell

scala> val path = "/user/beifeng/spark/sql/data/people.json"
path: String = /user/beifeng/spark/sql/data/people.json
scala> val df = sqlContext.jsonFile(path)
scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
scala> df.registerTempTable("json_people")
scala> sqlContext.sql("show tables").show
+-----------+-----------+
|  tableName|isTemporary|
+-----------+-----------+
|json_people|       true|
+-----------+-----------+
scala> sqlContext.dropTempTable("json_people")
scala> sqlContext.sql("show tables").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
+---------+-----------+
scala> df.registerTempTable("json_people")
scala> sqlContext.sql("show tables").show
+-----------+-----------+
|  tableName|isTemporary|
+-----------+-----------+
|json_people|       true|
+-----------+-----------+
scala> sqlContext.tableNames()
res7: Array[String] = Array(json_people)
scala> sqlContext.sql("select age, name from json_people where age is not null").show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+
scala> sqlContext.sql("select age, name from json.`/user/beifeng/spark/sql/data/people.json` where age is not null").show()
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+
scala> df
res10: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.rdd
res11: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at <console>:30
scala> df.schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(age,LongType,true), StructField(name,StringType,true))

7、DataFrame

(1)DataFrame的概念
Spark1.3才出现DataFrame的概念。在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
DataFrame与RDD的对比如下:

(2)DataFrame的创建方式

val df = sqlContext.##

1)SparkSQL的操作:

  1. HQL/SQL开发
    将DataFrame注册为临时表,然后通过sqlContext.sql(“xxx”)进行任务执行。
  2. DSL开发
    直接通过DataFrame的API进行开发。

2)执行结果输出:

  1. 将DataFrame转换为RDD进行数据输出
val resultRDD = resultDataFrame.rdd.map(row => (row.getAs[Long](0),row.getAs[String](1)))
resultRDD.saveAsXXX
  1. 直接调用DataFrame的相关API进行数据输出
df.show()
df.##

3)SparkSQL应用的处理流程:

  1. 读取数据形成DataFrame
  2. 使用DSL或者HQL进行DataFrame的数据操作/数据处理
  3. 执行结果输出

(3)DataFrame内部是一个逻辑计划
所有的数据执行都是懒加载的。调用相关的API,实质上实在内部构建一个查询的逻辑计划,类似RDD的构建过程;只有当DataFrame被触发调用(获取数据的这种操作)的时候,才会真正的执行。
job执行步骤:
逻辑计划 --> 分析逻辑计划 --> 优化逻辑计划 --> 物理计划产生 --> 选择一个最优的物理计划 --> SparkCore代码生成 --> job执行

8、DataFrame的read和write编程模式

官方说明文档:
https://spark-packages.org/
https://github.com/databricks/
DataFrame的read和write编程模式是通过SparkSQL内部定义的read和write数据读写入口进行数据的加载和保存操作。
(1)读数据

val df = sqlContext.read.###.load()
def read: DataFrameReader = new DataFrameReader(this)

函数功能说明:
format:给定读取数据源的数据格式是什么
schema:给定数据的数据格式,如果不给定,会自动进行推断
option:给定读取数据需要的参数
load:加载数据形成DataFrame
jdbc:读取RDBMs数据库的数据形成DataFrame
(2)三个不同jdbc API的功能
1)给定url和表名称及user&password即可读取数据,内部形成的DataFrame的分区数是1个:

def jdbc(url: String, table: String, properties: Properties)

2)给定形成的DataFrame的分区数量以及进行数据分区的字段,要求分区字段的数据类型必须是数值类型的:

def jdbc(
      url: String,
      table: String,
      columnName: String, // 给定分区字段的列名称
      lowerBound: Long,   // 给定计算范围下界
      upperBound: Long,   // 给定计算范围上界
      numPartitions: Int, // 给定分区个数
      connectionProperties: Properties): DataFrame

步长及索引如下计算:

step = (upperBound - lowerBound) / numPartitions
currentIndex = step + lowerBound ==> (负无穷大,currentIndex]
preIndex = currentIndex
currentIndex += step ===> (preIndex, currentIndex]
	 				 ===> 直到分区数量为numPartitions - 1
	    给定上界的范围 ===> (currentIndex,正无穷大)

3)明确给定进行数据分区的字段条件

def jdbc(
      url: String,
      table: String,
      predicates: Array[String], // 数据分区的字段条件,predicates集合中的数据个数就是最终的数据分区个数
      connectionProperties: Properties): DataFrame

(3)写数据

df.write.###.save()
def write: DataFrameWriter = new DataFrameWriter(this)

函数功能说明:
mode:给定数据插入的策略(数据插入的文件夹或者表是否存在):

  • overwrite:overwrite the existing data. 如果数据存在,覆盖(数据指的是存储数据的文件夹或者表);
  • append:append the data. 追加的形式;
  • ignore:ignore the operation (i.e. no-op). 如果数据存在,不进行任何操作,不插入数据;
  • error:default option, throw an exception at runtime. 如果数据存在,直接抛出异常。

format:给定数据输出的格式。
option:给定参数。
partitionBy:给定分区字段。
save:将数据进行保存操作。
insertInto:将数据插入到一个表中。
saveAsTable: 将数据保存为一个表。
jdbc:将数据输出到关系型数据库中。
(4)读写数据示例
读数据示例:

df = sqlContext.read \
		.format("json") \
		.option("samplingRatio", "0.1") \
		.load("/home/michael/data.json")

写数据示例:

df.write \
  .format("parquet") \
  .mode("append") \
  .partitionBy("year") \
//  .save("fasterData")
  .saveAsTable("fasterData")

先读后写示例(使用传统数据源的ETL):

sqlContext.read \
	.format("com.databricks.spark.git") \
	.option("url", "https://github.com/apache/spark.git") \
	.option("numPartitions", "100") \
	.option("branches", "master, branch-1.3, branch-1.2") \
	.load() \
	.repartition(1) \
	.write \
	.format("json") \
	.save("/home/michael/spark.json")

(5)不同方式计算平均数
1)SQL语句

SELECT name, avg(age) FROM people GROUP BY name

2)MapReduce计算模型

private IntWritable one = new IntWritable(1)
private IntWritable output = new IntWritable()
protected void map(LongWritable key, Text value, Context context) {
	String[] fields = value.split("\t")
	output.set(Integer.parseInt(fields[1]))
	context.write(one, output)
}

IntWritable one = new IntWritable(1)
DoubleWritable average = new DoubleWritable()
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) {
	int sum = 0
	int count = 0
	for(IntWritable value: values) {
		sum += value.get()
		count++
	}
	average.set(sum/(double)count)
	context.write(key, average)
}

3)Spark Core(RDD)计算模型

data = sc.textFile(...).split("\t")
data.map(lambda x: (x[0], [int(x[1], 1])) \
	.reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]) \
	.map(lambda x: [x[0], x[1][0] / x[1][1]]) \
	.collect()

4)Spark SQL(DataFrame)计算模型

sqlContext.table("people") \
		  .groupBy("name") \
		  .agg("name", avg("age")) \
		  .collect()

(6)SQL语句转换成DataFrame

events = sqlContext.load("/data/events", "parquet")
training_data = events.where("city = 'New York' and year = 2015")
					  .select("timestamp")
					  .collect()

(7)每个Spark Application以loading data开始,以saving data结束。

发布了219 篇原创文章 · 获赞 603 · 访问量 129万+

猜你喜欢

转载自blog.csdn.net/gongxifacai_believe/article/details/97616903