Thoughts Resampled

Anatoliy Plastinin's Blog

Processing JSON data with Spark SQL

Spark SQL provides built-in support for variety of data formats, including JSON. Each new release of Spark contains enhancements that make use of DataFrames API with JSON data more convenient. Same time, there are a number of tricky aspects that might lead to unexpected results. In this post I’ll show how to use Spark SQL to deal with JSON.

Examples below show functionality for Spark 1.6 which is latest version at the moment of writing.

JSON is very simple, human-readable and easy to use format. But its simplicity can lead to problems, since it’s schema-less. Especially when you have to deal with unreliable third-party data sources, such services may return crazy JSON responses containing integer numbers as strings, or encode nulls different ways like null, "" or even "null".

Loading data

You can read and parse JSON to DataFrame directly from file:

1
val df = sqlContext.read.json("s3a://some-bucket/some-file.json")

Please note Spark expects each line to be a separate JSON object, so it will fail if you’ll try to load a pretty formatted JSON file.

Also you read JSON data from RDD[String] object like:

1
2
3
4
5
6
// construct RDD[Sting]
val events = sc.parallelize(
  """{"action":"create","timestamp":"2016-01-07T00:01:17Z"}""" :: Nil)

// read it
val df = sqlContext.read.json(events)

The latter option is also useful for reading JSON messages with Spark Streaming. Check out this post for example of how to process JSON data from Kafka using Spark Streaming.

If you are just playing around with DataFrames you can use show method to print DataFrame to console.

scala> df.show
+------+--------------------+
|action|           timestamp|
+------+--------------------+
|create|2016-01-07T00:01:17Z|
+------+--------------------+

Schema inference and explicit definition

Simply running sqlContext.read.json(events) will not load data, since DataFrames are evaluated lazily. But it will trigger schema inference, spark will go over RDD to determine schema that fits the data.

In the shell you can print schema using printSchema method:

scala> df.printSchema
root
 |-- action: string (nullable = true)
 |-- timestamp: string (nullable = true)

As you saw in the last example Spark inferred type of both columns as strings.

It is possible to provide schema explicitly to avoid that extra scan:

1
2
3
4
5
6
7
8
9
10
11
val schema = (new StructType).add("action", StringType).add("timestamp", TimestampType)

val df = sqlContext.read.schema(schema).json(events)

df.show

// +------+--------------------+
// |action|           timestamp|
// +------+--------------------+
// |create|2016-01-07 01:01:...|
// +------+--------------------+

As you might have noticed type of timestamp column is explicitly forced to be a TimestampType. It’s important to understand that this type coercion is performed in JSON parser, and it has nothing to do with DataFrame’s type casting functionality. Type coercions implemented in parser are somewhat limited and in some cases unobvious. Following example demonstrates it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val events = sc.parallelize(
  """{"action":"create","timestamp":1452121277}""" ::
  """{"action":"create","timestamp":"1452121277"}""" ::
  """{"action":"create","timestamp":""}""" ::
  """{"action":"create","timestamp":null}""" ::
  """{"action":"create","timestamp":"null"}""" ::
  Nil
)

val schema = (new StructType).add("action", StringType).add("timestamp", LongType)

sqlContext.read.schema(schema).json(events).show

// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// |  null|      null|
// |create|      null|
// |create|      null|
// |  null|      null|
// +------+----------+

Frankly that is not a result that one can expect. Look at 2nd row in the result set, as you may see, there is no conversion from string to integer. But here is one more big problem, if you try to set type for which parser doesn’t has conversion, it won’t simply discard value and set that field to null, instead it will consider entire row as incorrect, and set all fields to nulls. The good news is that you can read all values as strings.

Type casting

If you can’t be sure in a quality of you data, the best option is to explicitly provide schema forcing StringType for all untrusted fields to avoid extra RDD scan, and then cast those columns to desired type:

1
2
3
4
5
6
7
8
9
10
11
12
13
val schema = (new StructType).add("action", StringType).add("timestamp", StringType)

sqlContext.read.schema(schema).json(events).select($"action", $"timestamp".cast(LongType)).show

// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// |create|1452121277|
// |create|      null|
// |create|      null|
// |create|      null|
// +------+----------+

Now that’s more like a sane result.

Spark’s catalyst optimizer has a very powerful type casting functionality, let’s see how we can parse UNIX timestamps from the previous example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val schema = (new StructType).add("action", StringType).add("timestamp", StringType)

sqlContext.read.schema(schema).json(events)
  .select($"action", $"timestamp".cast(LongType).cast(TimestampType))
  .show

// +------+--------------------+
// |action|           timestamp|
// +------+--------------------+
// |create|2016-01-07 00:01:...|
// |create|2016-01-07 00:01:...|
// |create|                null|
// |create|                null|
// |create|                null|
// +------+--------------------+

Spark allows to parse integer timestamps as a timestamp type, but right now (as of spark 1.6) there exists a difference in behavior: parser treats integer value as a number of milliseconds, but catalysts cast behavior is treat as a number of seconds. This behavior is about to change in Spark 2.0 (see SPARK-12744).

Handling nested objects

Often in API responses useful data might be wrapped in a several layers of nested objects:

1
2
3
4
5
6
7
8
{
  "payload": {
    "event": {
      "action": "create",
      "timestamp": 1452121277
    }
  }
}

Star (*) expansion makes it easier to unnest with such objects, for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val vals = sc.parallelize(
  """{"payload":{"event":{"action":"create","timestamp":1452121277}}}""" ::
  Nil
)

val schema = (new StructType)
  .add("payload", (new StructType)
    .add("event", (new StructType)
      .add("action", StringType)
      .add("timestamp", LongType)
    )
  )

sqlContext.read.schema(schema).json(vals).select($"payload.event.*").show

// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// +------+----------+

If you need more control over column names, you can always use as method to rename columns, e.g.:

1
2
3
4
5
6
7
8
9
10
11
sqlContext.read.schema(schema).json(vals)
  .select(
    $"payload.event.action".as("event_action"),
    $"payload.event.timestamp".as("event_timestamp")
  ).show

// +------------+---------------+
// |event_action|event_timestamp|
// +------------+---------------+
// |      create|     1452121277|
// +------------+---------------+

Wrapping up

That were quite a few tricks and things to keep in mind when dealing with JSON data. In conclusion I’d like to say obvious thing — do not disregard unit tests for data input and data transformations, especially when you have no control over data source. Also it will make you more confident doing upgrade to newer version of Spark, since parsing and casting behavior might change in the future.

Comments