It’s been 2 years since I wrote first tutorial on how to setup local docker environment for running Spark Streaming jobs with Kafka. This post is the follow-up to the previous one, but a little bit more advanced and up to date. It shows basic working example of Spark application that uses Spark SQL to process data stream from Kafka. I’ll also show how to run Spark application and setup local development environment with all components (ZooKeepr, Kafka) using docker and docker-compose.
TL;DR Check out this repository if you just want to see the code of the complete example.
Note: I assume that you already have docker and docker-compose installed on your machine. See previous post for instructions how to setup docker with docker-machine on Mac OS, or consult with docker documentation.
Examples below show functionality for Spark 2.2 which is the latest version at the moment of writing.
Setting up Project with sbt
Spark requires packaging all project’s dependencies alongside application, so we will build fat jar that contains app and all its dependencies together. In this tutorial we’ll use sbt with sbt-assembly plugin to build a fat jar with our demo app.
To add sbt-assembly
to the project, create project/plugins.sbt
file:
project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
Now let’s define build configuration in build.sbt
file:
build.sbt
:= "kafka-spark-demo"
name
:= "2.11.11"
scalaVersion
val sparkVersion = "2.2.0"
++= Seq(
libraryDependencies "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion excludeAll(
ExclusionRule(organization = "org.spark-project.spark", name = "unused"),
ExclusionRule(organization = "org.apache.spark", name = "spark-streaming"),
ExclusionRule(organization = "org.apache.hadoop")
)
)
:= file("build")
target in assembly
:= s"${name.value}.jar" assemblyJarName in assembly
A few things are going there. First, we define versions of Scala and Spark.
Next, we define dependencies. spark-core
, spark-sql
and spark-streaming
are marked as provided
because they are already included in the spark distribution. Also a few exclusion rules are specified for spark-streaming-kafka-0-10
in order to exclude transitive dependencies that lead to assembly merge conflicts.
Finally we override the fat jar file name and the target directory where the fat jar is saved.
Spark Streaming Application
As an example we’ll write simple application that processes json data from Kafka using Spark SQL. App will compute number of different actions in a stream of JSON events like this:
{"action":"update","timestamp":"2017-10-05T23:02:51Z"}
Now let’s jump into the code, I will walk through the steps for making Spark Streaming and Spark SQL work together.
src/main/scala/com/antlypls/blog/KafkaSparkDemo.scala
package com.antlypls.blog
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaSparkDemo {
def main(args: Array[String]): Unit = {
// Configurations for kafka consumer
val kafkaBrokers = sys.env.get("KAFKA_BROKERS")
val kafkaGroupId = sys.env.get("KAFKA_GROUP_ID")
val kafkaTopic = sys.env.get("KAFKA_TOPIC")
// Verify that all settings are set
require(kafkaBrokers.isDefined, "KAFKA_BROKERS has not been set")
require(kafkaGroupId.isDefined, "KAFKA_GROUP_ID has not been set")
require(kafkaTopic.isDefined, "KAFKA_TOPIC has not been set")
// Create Spark Session
val spark = SparkSession
.builder()
.appName("KafkaSparkDemo")
.getOrCreate()
import spark.implicits._
// Create Streaming Context and Kafka Direct Stream with provided settings and 10 seconds batches
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaBrokers.get,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> kafkaGroupId.get,
"auto.offset.reset" -> "latest"
)
val topics = Array(kafkaTopic.get)
val stream = KafkaUtils.createDirectStream[String, String](
,
ssc,
PreferConsistent[String, String](topics, kafkaParams)
Subscribe)
// Define a schema for JSON data
val schema = new StructType()
.add("action", StringType)
.add("timestamp", TimestampType)
// Process batches:
// Parse JSON and create Data Frame
// Execute computation on that Data Frame and print result
.foreachRDD { (rdd, time) =>
streamval data = rdd.map(record => record.value)
val json = spark.read.schema(schema).json(data)
val result = json.groupBy($"action").agg(count("*").alias("count"))
.show
result}
// Start Stream
.start()
ssc.awaitTermination()
ssc}
}
Application requires following parameters to be defined via environment variables:
KAFKA_BROKERS
— list of Kafka brokers used for initial discovery in the formhost1:port1,host2:port2,...
;KAFKA_GROUP_ID
— unique string that identifies the consumer group;KAFKA_TOPIC
— name of the topic to consume data from.
Now you can build fat jar by running sbt assembly
from project’s root. After that you should find kafka-spark-demo.jar
in the build
directory.
Containers setup with docker-compose
In order to run our example we need three things:
- container with java where we’ll run our app;
- container with Kafka;
- and container with ZooKeeper, required by Kafka.
So let’s define all three components with docker-compose
.
docker-compose.yml
version: '3'
services:
zookeeper:
image: antlypls/zookeeper
kafka:
image: antlypls/kafka
depends_on:
- zookeeper
environment:
KAFKA_CREATE_TOPICS: "events:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
java:
image: openjdk:jre
command: bash
ports:
- "4040:4040"
volumes:
- ./build:/build
working_dir: /build
depends_on:
- zookeeper
- kafka
Please note that zookeeper
image exposes port 2181
and kafka
image exposes port 9092
, so those ports will be available inside containers.
Also kafka
image provides easy way to create topic at startup time via KAFKA_CREATE_TOPICS
environment variable. In this example events
topic with 1 partition and replication factor 1 will be created. KAFKA_ZOOKEEPER_CONNECT
variable defines a connection string for the zookeeper connection. For more details about this containers see their source code (ZooKeeper, Kafka).
In the end docker-compose.yml
defines java
service based on openjdk:jre
image. We’ll use this container to run our Spark application. That’s why build
directory with fat jar is mounted into container. Also Spark Web UI port 4040
is forwarded to the host machine as well, so you’ll be able to open Web UI at runtime and see various stats and information about Spark execution.
Now we miss only one part: Spark distribution itself. For this demo download it into build
directory, for example you can do it with wget
:
wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
And then unpack it tar -xzf spark-2.2.0-bin-hadoop2.7.tgz
. Now you should have spark-2.2.0-bin-hadoop2.7
directory inside build
. At this point we don’t need the distribution archive anymore, so you can delete it.
Running Demo Application
And now we’re finally all set to run the whole thing.
Just run from project’s root docker-compose run --rm --service-ports java
. Here --rm
flag makes docker-compose
to delete corresponding spark container after run, and --service-ports
flag publishes services’ ports to host.
Now you should be logged in to the java
container’s shell and have working directory set to /build
.
Here we’ll just run demo app in a local mode, I don’t want to dive deep into how to run spark applications on a cluster with docker, as it deserves separate long blog post of its own.
In the java
container terminal run the following:
KAFKA_BROKERS=kafka:9092 \
KAFKA_GROUP_ID=spark-streaming-demo \
KAFKA_TOPIC=events \
spark-2.2.0-bin-hadoop2.7/bin/spark-submit \
--master local[*] \
--class com.antlypls.blog.KafkaSparkDemo kafka-spark-demo.jar
You should see a lot of debug output from Spark, and in between all that debug noise you should see:
+------+-----+
|action|count|
+------+-----+ +------+-----+
All batches are empty so far, since we haven’t sent any data to the Kafka topic. So let’s write some data into kafka topic, run this command
docker exec -it $(docker-compose ps -q kafka) kafka-console-producer.sh --broker-list localhost:9092 --topic events
And paste into terminal messages like this:
{"action":"create","timestamp":"2017-10-05T23:01:17Z"}
{"action":"update","timestamp":"2017-10-05T23:01:19Z"}
{"action":"update","timestamp":"2017-10-05T23:02:51Z"}
After that you should see output like:
+------+-----+
|action|count|
+------+-----+
|create| 1|
|update| 2| +------+-----+
Final notes
It might be hard to see actual app’s output because of huge amounts of debug information. In order to set log level to ERROR
put following log4j.properties
file in spark-2.2.0-bin-hadoop2.7/conf
directory:
spark-2.2.0-bin-hadoop2.7/conf/log4j.properties
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.threshold=ERROR
log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
docker-compose
doesn’t stop/delete linked containers when run
command exits. So after you exited from java
service session, you might want to run docker-compose stop
and docker-compose rm
to stop and delete zookeeper
and kafka
containers.
It’s also possible to implement same functionality using Structured Streaming, but that is going to have to wait for another post.
Check out this post if you want to learn more about JSON data processing with Spark.
And that’s basically all you need to know to start developing Spark Streaming applications and run/test them locally with docker.
I hope that post was helpful!