本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。
json字符串转DataFrame
spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:
- 若列数据全为null会用String类型
- 整数默认会用Long类型
- 浮点数默认会用Double类型
val json1 = """{"a":null, "b": 23.1, "c": 1}"""
val json2 = """{"a":null, "b": "hello", "d": 1.2}"""
val ds = spark.createDataset(Seq(json1, json2))
val df = spark.read.json(ds)
df.show
df.printSchema
+----+-----+----+----+
| a| b| c| d|
+----+-----+----+----+
|null| 23.1| 1|null|
|null|hello|null| 1.2|
+----+-----+----+----+
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: long (nullable = true)
|-- d: double (nullable = true)
若指定schema会按照schema生成DF:
- schema中不存在的列会被忽略
- 可以用两种方法指定schema,StructType和String,具体对应关系看后面
- 若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常
val schema = StructType(List(
StructField("a", ByteType, true),
StructField("b", FloatType, false),
StructField("c", ShortType, true)
))
//或 val schema = "b float, c short"
val df = spark.read.schema(schema).json(ds)
df.show
df.printSchema
+----+----+----+
| a| b| c|
+----+----+----+
|null|23.1| 1|
|null| 0|null|
+----+----+----+
root
|-- a: byte (nullable = true)
|-- b: float (nullable = true)
|-- c: short (nullable = true)
json解析相关配置参数
primitivesAsString
(default false): 把所有列看作string类型
prefersDecimal
(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.
allowComments
(default false): 忽略json字符串中Java/C++风格的注释
allowUnquotedFieldNames
(default false): 允许不加引号的列名
allowSingleQuotes
(default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros
(default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter
(default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars
(default false): 允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。
mode
(default PERMISSIVE): 允许在解析期间处理损坏记录的模式。
PERMISSIVE
:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。 如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED
: 忽略整条损害记录
FAILFAST
: 遇到损坏记录throws an exception
columnNameOfCorruptRecord
(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord
dateFormat
(default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式. 只有日期部分(无详细时间)
timestampFormat
(default yyyy-MM-dd’T’HH:mm:ss.SSSXXX): 自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)
multiLine
(default false): 解析一个记录,该记录可能跨越多行,每个文件
以上参数可用option方法配置:
val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show
stringDF.printSchema
+----+-----+----+----+
| a| b| c| d|
+----+-----+----+----+
|null| 23.1| 1|null|
|null|hello|null| 1.2|
+----+-----+----+----+
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: string (nullable = true)
|-- d: string (nullable = true)
二进制类型会自动用base64编码方式表示
‘Man’(ascci) base64编码后为:”TWFu”
val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte)
val binaryDs = spark.createDataset(Seq(byteArr))
val dsWithB64 = binaryDs.withColumn("b64", base64(col("value")))
dsWithB64.show(false)
dsWithB64.printSchema
+----------+----+
|value |b64 |
+----------+----+
|[4D 61 6E]|TWFu|
+----------+----+
root
|-- value: binary (nullable = true)
|-- b64: string (nullable = true)
//=================================================
dsWithB64.toJSON.show(false)
+-----------------------------+
|value |
+-----------------------------+
|{"value":"TWFu","b64":"TWFu"}|
+-----------------------------+
//=================================================
val json = """{"value":"TWFu"}"""
val jsonDs = spark.createDataset(Seq(json))
val binaryDF = spark.read.schema("value binary").json(jsonDs )
binaryDF.show
binaryDF.printSchema
+----------+
| value|
+----------+
|[4D 61 6E]|
+----------+
root
|-- value: binary (nullable = true)
指定schema示例:
以下是Spark SQL支持的所有基本类型:
val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3, "floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23, "binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12 11:22:22.123123"}"""
val ds = spark.createDataset(Seq(json))
val schema = "stringc string, shortc short, integerc int, longc long, floatc float, doublec double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary, datec date, timestampc timestamp"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc |datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root
|-- stringc: string (nullable = true)
|-- shortc: short (nullable = true)
|-- integerc: integer (nullable = true)
|-- longc: long (nullable = true)
|-- floatc: float (nullable = true)
|-- doublec: double (nullable = true)
|-- decimalc: decimal(10,3) (nullable = true)
|-- booleanc: boolean (nullable = true)
|-- bytec: byte (nullable = true)
|-- binaryc: binary (nullable = true)
|-- datec: date (nullable = true)
|-- timestampc: timestamp (nullable = true)
复合类型:
val json = """
{
"arrayc" : [ 1, 2, 3 ],
"structc" : {
"strc" : "efg",
"decimalc" : 1.1
},
"mapc" : {
"key1" : 1.2,
"key2" : 1.1
}
}
"""
val ds = spark.createDataset(Seq(json))
val schema = "arrayc array<short>, structc struct<strc:string, decimalc:decimal>, mapc map<string, float>"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema
+---------+--------+--------------------------+
|arrayc |structc |mapc |
+---------+--------+--------------------------+
|[1, 2, 3]|[efg, 1]|[key1 -> 1.2, key2 -> 1.1]|
+---------+--------+--------------------------+
root
|-- arrayc: array (nullable = true)
| |-- element: short (containsNull = true)
|-- structc: struct (nullable = true)
| |-- strc: string (nullable = true)
| |-- decimalc: decimal(10,0) (nullable = true)
|-- mapc: map (nullable = true)
| |-- key: string
| |-- value: float (valueContainsNull = true)
SparkSQL数据类型
基本类型:
DataType | simpleString | typeName | sql | defaultSize | catalogString | json |
---|---|---|---|---|---|---|
StringType | string | string | STRING | 20 | string | “string” |
ShortType | smallint | short | SMALLINT | 2 | smallint | “short” |
IntegerType | int | integer | INT | 4 | int | “integer” |
LongType | bigint | long | BIGINT | 8 | bigint | “long” |
FloatType | float | float | FLOAT | 4 | float | “float” |
DoubleType | double | double | DOUBLE | 8 | double | “double” |
DecimalType(10,3) | decimal(10,3) | decimal(10,3) | DECIMAL(10,3) | 8 | decimal(10,3) | “decimal(10,3)” |
BooleanType | boolean | boolean | BOOLEAN | 1 | boolean | “boolean” |
ByteType | tinyint | byte | TINYINT | 1 | tinyint | “byte” |
BinaryType | binary | binary | BINARY | 100 | binary | “binary” |
DateType | date | date | DATE | 4 | date | “date” |
TimestampType | timestamp | timestamp | TIMESTAMP | 8 | timestamp | “timestamp” |
三个复合类型:
DataType | simpleString | typeName | sql | defaultSize | catalogString | json |
---|---|---|---|---|---|---|
ArrayType(IntegerType, true) | array<int> | array | ARRAY<INT> | 4 | array<int> | {“type”:”array”,”elementType”:”integer”,”containsNull”:true} |
MapType(StringType, LongType, true) | map<string,bigint> | map | MAP<STRING, BIGINT> | 28 | map<string,bigint> | {“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true} |
StructType(StructField(“sf”, DoubleType)::Nil) | struct<sf:double> | struct | STRUCT<`sf`: DOUBLE> | 8 | struct<sf:double> | {“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]} |