Deploying Kafka dependent Scala microservices with Docker and SBT
Summary
In this post we will see how to use docker-compose and sbt-docker to deploy scala microservices. We will create microservices to (1) get tweets using twitter streaming api and put them in kafka and (2) read from kafka and get number of hashtags in each tweet. We will then see how to use sbt-docker plugin to create separate docker images for our services. Finally we will use docker-compose to define the environment for our services and run them.
Full code for this post is available at https://github.com/saumitras/kafka-twitter-docker
Our application will consists of following services:
- Kafka
- Zookeeper (kafka service depends on zookeeper)
- Tweet Producer (depends on kafka service)
- Tweet consumer (depends on kafka service)
Here's high level steps:
- Create a kafka producer service to get tweets using twitter streaming api
- Create a kafka consumer service to read tweets from kafka and get count of tags
- Use sbt-docker to package and create images for these services
- Create a
docker-compose.yml
file to define the services and use docker-compose to start a multi container environment - Run the services
Setup twitter stream
For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and create a new app to get these. After creating an app, click on "Keys and access token" and copy following:
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
val KEYWORDS = List("#scala", "#kafka", "#cassandra", "#solr", "#apachespark", "#fastdata", "#bigdata")
def startTweetStream() = {
val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true)
cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk") //replace this with your own keys
cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt") //replace this with your own keys
cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn") //replace this with your own keys
cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU") //replace this with your own keys
val stream = new TwitterStreamFactory(cb.build()).getInstance()
val listener = new StatusListener {
override def onTrackLimitationNotice(i: Int) = println(s"Track limited $i tweets")
override def onStallWarning(stallWarning: StallWarning) = println("Stream stalled")
override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) = println("Status ${statusDeletionNotice.getStatusId} deleted")
override def onScrubGeo(l: Long, l1: Long) = println(s"Geo info scrubbed. userId:$l, upToStatusId:$l1")
override def onException(e: Exception) = println("Exception occurred. " + e.getMessage)
override def onStatus(status: Status): Unit = {
val tweet = status.getText
println("[Producer] " + tweet)
}
}
stream.addListener(listener)
val fq = new FilterQuery()
fq.track(KEYWORDS.mkString(","))
stream.filter(fq)
}
You will now start seeing tweets being printed on your console. In next step, instead of printing, we will send them to kafka under tweets
topic
Create kafka producer service
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import twitter4j._
import twitter4j.conf.ConfigurationBuilder
object TweetProducer extends App {
val BROKER_LIST = "localhost:9092"
val KEYWORDS = List("#scala", "#kafka", "#cassandra", "#solr", "#apachespark", "#fastdata", "#bigdata")
val TOPIC = "tweets"
val props = new Properties()
props.put("bootstrap.servers", BROKER_LIST)
props.put("client.id", "KafkaTweetProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[String, String](props)
def startTweetStream() = {
...
override def onStatus(status: Status): Unit = {
val tweet = status.getText
val data = new ProducerRecord[String, String](TOPIC, tweet)
producer.send(data)
}
...
}
startTweetStream()
}
This completes our tweet-producer
service. Next we will define a consumer service to read data from tweets
topics and get hashtag count of each tweet.
Create kafka consumer service
import java.util.Properties
import kafka.serializer.DefaultDecoder
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
object TweetConsumer extends App {
val ZK_HOST = "zookeeper:2181"
val TOPIC = "tweets"
private val props = new Properties()
props.put("group.id", "tweet-consumer")
props.put("zookeeper.connect", ZK_HOST)
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "120000")
props.put("zookeeper.connection.timeout.ms","20000")
props.put("auto.commit.interval.ms", "10000")
private val consumerConfig = new ConsumerConfig(props)
private val consumerConnector = Consumer.create(consumerConfig)
private val filterSpec = new Whitelist(TOPIC)
def read() = try {
val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,
new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
while (iterator.hasNext()) {
val tweet = iterator.next().message().map(_.toChar).mkString
val numTags = tweet.count(_ == '#')
println(s"[Consumer] [TagCount=$numTags] $tweet")
}
} catch {
case ex: Exception =>
ex.printStackTrace()
}
read()
}
This completes out twitter-consumer
service. Next we need to package our services as standalone jars and create separate docker images for both. We will use sbt-docker
plugin for this. But before we do that, if you want to use sbt-docker
as non-root user(which is the case most of the time), then you need to configure docker.
Running docker commands as non root user
The docker daemon always runs as the root user. The docker daemon binds to a Unix socket instead of a TCP port. By default, that Unix socket is owned by the user root, and so, by default, you can only access it with sudo. Since we want to be able to package our application as non-root user, so we need to make sure that sbt-docker can access the socket in non-root.
If you create a Unix group called docker and add users to it, then the docker daemon will make the ownership of the Unix socket read/writable by the docker group when the daemon starts.
To add your user (who has root privileges) to docker group run following commands:
Verify that you can use it as non-root user by running docker ps
.
Sbt-docker plugin
Add sbt-docker as a dependency in project/plugins.sbt
:
We start by defining docker settings in build.sbt
def dockerSettings(debugPort: Option[Int] = None) = Seq(
dockerfile in docker := {
val artifactSource: File = assembly.value
val artifactTargetPath = s"/project/${artifactSource.name}"
val scriptSourceDir = baseDirectory.value / "../scripts"
val projectDir = "/project/"
new Dockerfile {
from("saumitras01/java:1.8.0_111")
add(artifactSource, artifactTargetPath)
copy(scriptSourceDir, projectDir)
entryPoint(s"/project/start.sh")
cmd(projectDir, s"${name.value}", s"${version.value}")
}
},
imageNames in docker := Seq(
ImageName(s"saumitras01/${name.value}:latest")
)
)
lazy val producer = (project in file("tweet-producer"))
.enablePlugins(sbtdocker.DockerPlugin)
.settings(
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-core" % "4.0.4",
"org.twitter4j" % "twitter4j-stream" % "4.0.4",
"org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri")
),
dockerSettings()
)
lazy val consumer = (project in file("tweet-consumer"))
.enablePlugins(sbtdocker.DockerPlugin)
.settings(
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
dockerSettings()
)
Here's what it means:
- Package the application using sbt-assembly to get standalone jar for producer and consumer project as
artifactTargetPath
- Create a new docker image using
saumitras01/java:1.8.0_111
as base image - Copy the jars in
/project
dir of image - Copy scripts(to start the services) from your local
../script
dir to image's/project/scripts/
dir - Execute
/project/start.sh
as soon as container is started for each service. This script will cause your container to wait until kafka is up. It will then start the consumer/producer service - Create the image as
saumitras01/saumitras01/tweet-producer-app
andsaumitras01/saumitras01/tweet-consumer-app
and create alatest
tag for each of them.
We then use these setting while building both producer
and consumer
project. You are now ready to create the images for your services. Run sbt docker
:
sam@sam-ub:kafkatwitterdocker$ sbt
[info] Loading project definition from /home/sam/projects/kafkatwitterdocker/project
[info] Set current project to KafkaTwitterDockerExample (in build file:/home/sam/projects/kafkatwitterdocker/)
> docker
You will see something like below for both tweet-producer
and tweet-consumer
projects:
[info] Step 1 : FROM saumitras01/java:1.8.0_111
[info] ---> d53c800d525c
[info] Step 2 : ADD 0/tweet-consumer-app-assembly-1.0.jar /project/tweet-consumer-app-assembly-1.0.jar
[info] ---> db7146cacb9d
[info] Removing intermediate container 81881e37217f
[info] Step 3 : COPY 1/scripts /project/
[info] ---> d2969bcdbfc1
[info] Removing intermediate container f7ab67074a2e
[info] Step 4 : ENTRYPOINT /project/start.sh
[info] ---> Running in 34eb4d7d282c
[info] ---> bee7490cbfb0
[info] Removing intermediate container 34eb4d7d282c
[info] Step 5 : CMD /project/ tweet-consumer-app 1.0
[info] ---> Running in 7d16d4e7ef45
[info] ---> fd8af3ffe7c1
[info] Removing intermediate container 7d16d4e7ef45
[info] Successfully built fd8af3ffe7c1
[info] Tagging image fd8af3ffe7c1 with name: saumitras01/tweet-consumer-app:latest
[success] Total time: 44 s, completed 4 Nov, 2017 11:33:14 PM
docker images
command, you will see you images listed: sam@sam-ub:kafkatwitterdocker$ docker images | grep saumitras01
saumitras01/tweet-consumer-app latest fd8af3ffe7c1 2 days ago 659.9 MB
saumitras01/tweet-producer-app latest 4b87ea6e633c 2 days ago 660.3 MB
saumitras01/java 1.8.0_111 d53c800d525c 2 days ago 643.2 MB
Docker compose
Now that we have all the images available, we need to create a docker compose config file to define all the services, dependencies and ports.
Before we can start a producer or consumer, we need to make sure that kafka is running. Kafka depends on zookeeper service.
Define zookeeper
service config
This will pull the image saumitras/zookeeper
repo and start a container and exposes its 2181 port as zookeeper:2181
.
Next we define config for kafka
service:
kafka:
image: saumitras01/kafka:latest
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
This will pull tag 0.10.0.0
from saumitras/kafka
repo and start kafka on port 9092 as kafka:9092
.
Next we define consumer and producer services. Here's complete content of docker-compose.yml
to start 1 producer and 1 consumer
version: '2'
services:
zookeeper:
image: saumitras01/zookeeper:v3.4.9
ports:
- "2181:2181"
kafka:
image: saumitras01/kafka:latest
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
tweet-producer:
image: saumitras01/tweet-producer-app:latest
ports:
- "8080:8080"
tty: true
depends_on:
- kafka
tweet-consumer:
image: saumitras01/tweet-consumer-app:latest
ports:
- "8081:8081"
tty: true
depends_on:
- kafka
If you just want to start one of the service on a given machine, then remove other's config from you docker-compose.yml
defintion.
We are now ready to run our application. Run command:
Your services will start and you should start seeing the output from your services
Creating kafkatwitterdocker_zookeeper_1 ...
Creating kafkatwitterdocker_zookeeper_1 ... done
Creating kafkatwitterdocker_kafka_1 ...
Creating kafkatwitterdocker_kafka_1 ... done
Creating kafkatwitterdocker_tweet-consumer_1 ...
Creating kafkatwitterdocker_tweet-producer_1 ...
Creating kafkatwitterdocker_tweet-producer_1
Creating kafkatwitterdocker_tweet-consumer_1 ... done
Attaching to kafkatwitterdocker_zookeeper_1, kafkatwitterdocker_kafka_1, kafkatwitterdocker_tweet-producer_1, kafkatwitterdocker_tweet-consumer_1
....
....
tweet-producer_1 | [Producer] RT @loopbak: Outlier Detection with Unsupervised Decision Trees: https://t.co/weUoj3vTcy #abdsc #BigData #DataScience… https://t.co/sYYcb2Z…
tweet-consumer_1 | [Consumer] [TagCount=3] RT @loopbak: Outlier Detection with Unsupervised Decision Trees: https://t.co/weUoj3vTcy #abdsc #BigData #DataScience¬タᆭ https://t.co/sYYcb2Z¬タᆭ
tweet-producer_1 | [Producer] RT @HPI_DE: Von #DataMining bis #DataAnalytics – wie man #BigData sinnvoll auswertet erfahren Sie im kostenlosen #MOOC @openHPI…
tweet-consumer_1 | [Consumer] [TagCount=4] RT @HPI_DE: Von #DataMining bis #DataAnalytics ¬タモ wie man #BigData sinnvoll auswertet erfahren Sie im kostenlosen #MOOC @openHPI¬タᆭ
tweet-producer_1 | [Producer] RT @AcadayLabs: The future #blockchain infrastructure Gavin Wood & Jutta Steiner at @disruptberlin https://t.co/PTmjNpg5D0…
tweet-consumer_1 | [Consumer] [TagCount=1] RT @AcadayLabs: The future #blockchain infrastructure Gavin Wood & Jutta Steiner at @disruptberlin https://t.co/PTmjNpg5D0¬タᆭ
tweet-producer_1 | [Producer] RT @rajneeshchhabra: 3 Essentials Of Every Digital Journey https://t.co/hi5hJZXmz9 #MachineLearning #IoT #IIoT #AI #ML #DL #BigData #Artifi…
tweet-consumer_1 | [Consumer] [TagCount=8] RT @rajneeshchhabra: 3 Essentials Of Every Digital Journey https://t.co/hi5hJZXmz9 #MachineLearning #IoT #IIoT #AI #ML #DL #BigData #Artifi¬タᆭ
If you run docker ps
, you will see different containers started for your services
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ca5d617191fe saumitras01/tweet-producer-app:latest "/project/start.sh /p" 11 seconds ago Up 8 seconds 0.0.0.0:8080->8080/tcp kafkatwitterdocker_tweet-producer_1
9717ceba31f0 saumitras01/tweet-consumer-app:latest "/project/start.sh /p" 11 seconds ago Up 8 seconds 0.0.0.0:8081->8081/tcp kafkatwitterdocker_tweet-consumer_1
e47522435b83 saumitras01/kafka:latest "start-kafka.sh" 13 seconds ago Up 11 seconds 0.0.0.0:9092->9092/tcp kafkatwitterdocker_kafka_1
3810212cb549 saumitras01/zookeeper:v3.4.9 "/bin/sh -c '/usr/sbi" 14 seconds ago Up 12 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafkatwitterdocker_zookeeper_1
There are lot of other things to consider when going into production with docker, but I hope this post gave you an idea of how the workflow will look like.