Spark SQL provides built-in support for variety of data formats, including JSON. Each new release of Spark contains enhancements that make use of DataFrames API with JSON data more convenient. Same time, there are a number of tricky aspects that might lead to unexpected results. In this post I’ll show how to use Spark SQL to deal with JSON.
Examples below show functionality for Spark 1.6 which is latest version at the moment of writing.
JSON is very simple, human-readable and easy to use format.
But its simplicity can lead to problems, since it’s schema-less.
Especially when you have to deal with unreliable third-party data sources,
such services may return crazy JSON responses containing integer numbers as strings,
or encode nulls different ways like
"" or even
You can read and parse JSON to DataFrame directly from file:
Please note Spark expects each line to be a separate JSON object, so it will fail if you’ll try to load a pretty formatted JSON file.
Also you read JSON data from
RDD[String] object like:
1 2 3 4 5 6
The latter option is also useful for reading JSON messages from Kafka with Spark Streaming.
If you are just playing around with DataFrames
you can use
show method to print DataFrame to console.
scala> df.show +------+--------------------+ |action| timestamp| +------+--------------------+ |create|2016-01-07T00:01:17Z| +------+--------------------+
Schema inference and explicit definition
sqlContext.read.json(events) will not load data,
since DataFrames are evaluated lazily.
But it will trigger schema inference,
spark will go over RDD to determine schema that fits the data.
In the shell you can print schema using
scala> df.printSchema root |-- action: string (nullable = true) |-- timestamp: string (nullable = true)
As you saw in the last example Spark inferred type of both columns as strings.
It is possible to provide schema explicitly to avoid that extra scan:
1 2 3 4 5 6 7 8 9 10 11
As you might have noticed type of
timestamp column is explicitly forced to be a
It’s important to understand that this type coercion is performed in JSON parser,
and it has nothing to do with DataFrame’s type casting functionality.
Type coercions implemented in parser are somewhat limited and in some cases unobvious.
Following example demonstrates it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
Frankly that is not a result that one can expect.
Look at 2nd row in the result set, as you may see, there is no conversion from string to integer.
But here is one more big problem, if you try to set type for which parser doesn’t has conversion,
it won’t simply discard value and set that field to
instead it will consider entire row as incorrect, and set all fields to
The good news is that you can read all values as strings.
If you can’t be sure in a quality of you data,
the best option is to explicitly provide schema forcing
StringType for all untrusted fields
to avoid extra RDD scan, and then cast those columns to desired type:
1 2 3 4 5 6 7 8 9 10 11 12 13
Now that’s more like a sane result.
Spark’s catalyst optimizer has a very powerful type casting functionality, let’s see how we can parse UNIX timestamps from the previous example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Spark allows to parse integer timestamps as a timestamp type, but right now (as of spark 1.6) there exists a difference in behavior: parser treats integer value as a number of milliseconds, but catalysts cast behavior is treat as a number of seconds. This behavior is about to change in Spark 2.0 (see SPARK-12744).
Handling nested objects
Often in API responses useful data might be wrapped in a several layers of nested objects:
1 2 3 4 5 6 7 8
*) expansion makes it easier to unnest with such objects, for example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
If you need more control over column names,
you can always use
as method to rename columns, e.g.:
1 2 3 4 5 6 7 8 9 10 11
That was quite a few tricks and things to keep in mind when dealing with JSON data. In conclusion I’d like to say obvious thing — do not disregard unit tests for data input and data transformations, especially when you have no control over data source. Also it will make you more confident doing upgrade to newer version of Spark, since parsing and casting behavior might change in the future.