Thoughts Resampled

Anatoliy Plastinin's Blog

Using Spark SQL and Spark Streaming together

It’s been 2 years since I wrote first tutorial on how to setup local docker environment for running Spark Streaming jobs with Kafka. This post is the follow-up to the previous one, but a little bit more advanced and up to date. It shows basic working example of Spark application that uses Spark SQL to process data stream from Kafka. I’ll also show how to run Spark application and setup local development environment with all components (ZooKeepr, Kafka) using docker and docker-compose.

TL;DR Check out this repository if you just want to see the code of the complete example.

Note: I assume that you already have docker and docker-compose installed on your machine. See previous post for instructions how to setup docker with docker-machine on Mac OS, or consult with docker documentation.

Examples below show functionality for Spark 2.2 which is the latest version at the moment of writing.

Setting up Project with sbt

Spark requires packaging all project’s dependencies alongside application, so we will build fat jar that contains app and all its dependencies together. In this tutorial we’ll use sbt with sbt-assembly plugin to build a fat jar with our demo app.

To add sbt-assembly to the project, create project/plugins.sbt file:

project/plugins.sbt
1
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

Now let’s define build configuration in build.sbt file:

build.sbt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
name := "kafka-spark-demo"

scalaVersion := "2.11.11"

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion excludeAll(
    ExclusionRule(organization = "org.spark-project.spark", name = "unused"),
    ExclusionRule(organization = "org.apache.spark", name = "spark-streaming"),
    ExclusionRule(organization = "org.apache.hadoop")
  )
)

target in assembly := file("build")

assemblyJarName in assembly := s"${name.value}.jar"

A few things are going there. First, we define versions of Scala and Spark.

Next, we define dependencies. spark-core, spark-sql and spark-streaming are marked as provided because they are already included in the spark distribution. Also a few exclusion rules are specified for spark-streaming-kafka-0-10 in order to exclude transitive dependencies that lead to assembly merge conflicts.

Finally we override the fat jar file name and the target directory where the fat jar is saved.

Spark Streaming Application

As an example we’ll write simple application that processes json data from Kafka using Spark SQL. App will compute number of different actions in a stream of JSON events like this:

{"action":"update","timestamp":"2017-10-05T23:02:51Z"}

Now let’s jump into the code, I will walk through the steps for making Spark Streaming and Spark SQL work together.

src/main/scala/com/antlypls/blog/KafkaSparkDemo.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.antlypls.blog

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaSparkDemo {
  def main(args: Array[String]): Unit = {
    // Configurations for kafka consumer
    val kafkaBrokers = sys.env.get("KAFKA_BROKERS")
    val kafkaGroupId = sys.env.get("KAFKA_GROUP_ID")
    val kafkaTopic = sys.env.get("KAFKA_TOPIC")

    // Verify that all settings are set
    require(kafkaBrokers.isDefined, "KAFKA_BROKERS has not been set")
    require(kafkaGroupId.isDefined, "KAFKA_GROUP_ID has not been set")
    require(kafkaTopic.isDefined, "KAFKA_TOPIC has not been set")

    // Create Spark Session
    val spark = SparkSession
      .builder()
      .appName("KafkaSparkDemo")
      .getOrCreate()

    import spark.implicits._

    // Create Streaming Context and Kafka Direct Stream with provided settings and 10 seconds batches
    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> kafkaBrokers.get,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> kafkaGroupId.get,
      "auto.offset.reset" -> "latest"
    )

    val topics = Array(kafkaTopic.get)
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // Define a schema for JSON data
    val schema = new StructType()
      .add("action", StringType)
      .add("timestamp", TimestampType)

    // Process batches:
    // Parse JSON and create Data Frame
    // Execute computation on that Data Frame and print result
    stream.foreachRDD { (rdd, time) =>
      val data = rdd.map(record => record.value)
      val json = spark.read.schema(schema).json(data)
      val result = json.groupBy($"action").agg(count("*").alias("count"))
      result.show
    }

    // Start Stream
    ssc.start()
    ssc.awaitTermination()
  }
}

Application requires following parameters to be defined via environment variables:

  • KAFKA_BROKERS — list of Kafka brokers used for initial discovery in the form host1:port1,host2:port2,...;
  • KAFKA_GROUP_ID — unique string that identifies the consumer group;
  • KAFKA_TOPIC — name of the topic to consume data from.

Now you can build fat jar by running sbt assembly from project’s root. After that you should find kafka-spark-demo.jar in the build directory.

Containers setup with docker-compose

In order to run our example we need three things:

  • container with java where we’ll run our app;
  • container with Kafka;
  • and container with ZooKeeper, required by Kafka.

So let’s define all three components with docker-compose.

docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
version: '3'

services:
  zookeeper:
    image: antlypls/zookeeper

  kafka:
    image: antlypls/kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_CREATE_TOPICS: "events:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  java:
    image: openjdk:jre
    command: bash
    ports:
      - "4040:4040"
    volumes:
      - ./build:/build
    working_dir: /build
    depends_on:
      - zookeeper
      - kafka

Please note that zookeeper image exposes port 2181 and kafka image exposes port 9092, so those ports will be available inside containers.

Also kafka image provides easy way to create topic at startup time via KAFKA_CREATE_TOPICS environment variable. In this example events topic with 1 partition and replication factor 1 will be created. KAFKA_ZOOKEEPER_CONNECT variable defines a connection string for the zookeeper connection. For more details about this containers see their source code (ZooKeeper, Kafka).

In the end docker-compose.yml defines java service based on openjdk:jre image. We’ll use this container to run our Spark application. That’s why build directory with fat jar is mounted into container. Also Spark Web UI port 4040 is forwarded to the host machine as well, so you’ll be able to open Web UI at runtime and see various stats and information about Spark execution.

Now we miss only one part: Spark distribution itself. For this demo download it into build directory, for example you can do it with wget:

wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz

And then unpack it tar -xzf spark-2.2.0-bin-hadoop2.7.tgz. Now you should have spark-2.2.0-bin-hadoop2.7 directory inside build. At this point we don’t need the distribution archive anymore, so you can delete it.

Running Demo Application

And now we’re finally all set to run the whole thing.

Just run from project’s root docker-compose run --rm --service-ports java. Here --rm flag makes docker-compose to delete corresponding spark container after run, and --service-ports flag publishes services' ports to host.

Now you should be logged in to the java container’s shell and have working directory set to /build.

Here we’ll just run demo app in a local mode, I don’t want to dive deep into how to run spark applications on a cluster with docker, as it deserves separate long blog post of its own.

In the java container terminal run the following:

KAFKA_BROKERS=kafka:9092 \
KAFKA_GROUP_ID=spark-streaming-demo \
KAFKA_TOPIC=events \
spark-2.2.0-bin-hadoop2.7/bin/spark-submit \
  --master local[*] \
  --class com.antlypls.blog.KafkaSparkDemo kafka-spark-demo.jar

You should see a lot of debug output from Spark, and in between all that debug noise you should see:

+------+-----+
|action|count|
+------+-----+
+------+-----+

All batches are empty so far, since we haven’t sent any data to the Kafka topic. So let’s write some data into kafka topic, run this command

docker exec -it $(docker-compose ps -q kafka) kafka-console-producer.sh --broker-list localhost:9092 --topic events

And paste into terminal messages like this:

{"action":"create","timestamp":"2017-10-05T23:01:17Z"}
{"action":"update","timestamp":"2017-10-05T23:01:19Z"}
{"action":"update","timestamp":"2017-10-05T23:02:51Z"}

After that you should see output like:

+------+-----+
|action|count|
+------+-----+
|create|    1|
|update|    2|
+------+-----+

Final notes

It might be hard to see actual app’s output because of huge amounts of debug information. In order to set log level to ERROR put following log4j.properties file in spark-2.2.0-bin-hadoop2.7/conf directory:

spark-2.2.0-bin-hadoop2.7/conf/log4j.properties
1
2
3
4
5
6
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.threshold=ERROR
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

docker-compose doesn’t stop/delete linked containers when run command exits. So after you exited from java service session, you might want to run docker-compose stop and docker-compose rm to stop and delete zookeeper and kafka containers.

It’s also possible to implement same functionality using Structured Streaming, but that is going to have to wait for another post.

Check out this post if you want to learn more about JSON data processing with Spark.

And that’s basically all you need to know to start developing Spark Streaming applications and run/test them locally with docker.

I hope that post was helpful!

Comments