Saumitra's blog

Search / Analytics / Distributed Systems / Machine Learning / DSLs

Deploying Kafka Dependent Scala Microservices With Docker

Summary

In this post we will see how to use docker-compose and sbt-docker to deploy scala microservices. We will create microservices to (1) get tweets using twitter streaming api and put them in kafka and (2) read from kafka and get number of hashtags in each tweet. We will then see how to use sbt-docker plugin to create separate docker images for our services. Finally we will use docker-compose to define the environment for our services and run them.

Full code for this post is available at https://github.com/saumitras/kafka-twitter-docker

Our application will consists of following services:

  • Kafka
    • Zookeeper (kafka service depends on zookeeper)
  • Tweet Producer (depends on kafka service)
  • Tweet consumer (depends on kafka service)

Here’s high level steps:

  • Create a kafka producer service to get tweets using twitter streaming api
  • Create a kafka consumer service to read tweets from kafka and get count of tags
  • Use sbt-docker to package and create images for these services
  • Create a docker-compose.yml file to define the services and use docker-compose to start a multi container environment
  • Run the services

Setup twitter stream

For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on “Keys and access token” and copy following:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
val KEYWORDS = List("#scala", "#kafka", "#cassandra", "#solr", "#apachespark", "#fastdata", "#bigdata")

def startTweetStream() = {
  val cb = new ConfigurationBuilder()
  cb.setDebugEnabled(true)
  cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk")  //replace this with your own keys
  cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt")  //replace this with your own keys
  cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn")  //replace this with your own keys
  cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU")  //replace this with your own keys

  val stream = new TwitterStreamFactory(cb.build()).getInstance()

  val listener = new StatusListener {

    override def onTrackLimitationNotice(i: Int) = println(s"Track limited $i tweets")
    override def onStallWarning(stallWarning: StallWarning) = println("Stream stalled")
    override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) = println("Status ${statusDeletionNotice.getStatusId} deleted")
    override def onScrubGeo(l: Long, l1: Long) = println(s"Geo info scrubbed. userId:$l, upToStatusId:$l1")
    override def onException(e: Exception) = println("Exception occurred. " + e.getMessage)

    override def onStatus(status: Status): Unit = {
      val tweet = status.getText
      println("[Producer] " + tweet)

    }

  }

  stream.addListener(listener)
  val fq = new FilterQuery()
  fq.track(KEYWORDS.mkString(","))
  stream.filter(fq)

}

You will now start seeing tweets being printed on your console. In next step, instead of printing, we will send them to kafka under tweets topic

Create kafka producer service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import twitter4j._
import twitter4j.conf.ConfigurationBuilder

object TweetProducer extends App {

  val BROKER_LIST = "localhost:9092"
  val KEYWORDS = List("#scala", "#kafka", "#cassandra", "#solr", "#apachespark", "#fastdata", "#bigdata")
  val TOPIC = "tweets"

  val props = new Properties()
  props.put("bootstrap.servers", BROKER_LIST)
  props.put("client.id", "KafkaTweetProducer")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")

  val producer = new KafkaProducer[String, String](props)

  def startTweetStream() = {
      ...

      override def onStatus(status: Status): Unit = {
        val tweet = status.getText
        val data = new ProducerRecord[String, String](TOPIC, tweet)
        producer.send(data)
      }
      ...

  }

  startTweetStream()

}

This completes our tweet-producer service. Next we will define a consumer service to read data from tweets topics and get hashtag count of each tweet.

Create kafka consumer service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Properties

import kafka.serializer.DefaultDecoder
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}

object TweetConsumer extends App {

  val ZK_HOST = "zookeeper:2181"
  val TOPIC = "tweets"

  private val props = new Properties()
  props.put("group.id", "tweet-consumer")
  props.put("zookeeper.connect", ZK_HOST)
  props.put("auto.offset.reset", "smallest")
  props.put("consumer.timeout.ms", "120000")
  props.put("zookeeper.connection.timeout.ms","20000")
  props.put("auto.commit.interval.ms", "10000")

  private val consumerConfig = new ConsumerConfig(props)
  private val consumerConnector = Consumer.create(consumerConfig)
  private val filterSpec = new Whitelist(TOPIC)

  def read() = try {
    val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,
      new DefaultDecoder(), new DefaultDecoder())(0)

    lazy val iterator = streams.iterator()

    while (iterator.hasNext()) {
      val tweet = iterator.next().message().map(_.toChar).mkString
      val numTags = tweet.count(_ == '#')
      println(s"[Consumer] [TagCount=$numTags] $tweet")
    }

  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }

  read()

}

This completes out twitter-consumer service. Next we need to package our services as standalone jars and create separate docker images for both. We will use sbt-docker plugin for this. But before we do that, if you want to use sbt-docker as non-root user(which is the case most of the time), then you need to configure docker.

Running docker commands as non root user

The docker daemon always runs as the root user. The docker daemon binds to a Unix socket instead of a TCP port. By default that Unix socket is owned by the user root, and so, by default, you can only access it with sudo. Since we want to be able to package our application as non-root user, so we need to make sure that sbt-docker can access the socket in non-root.

If you create a Unix group called docker and add users to it, then the docker daemon will make the ownership of the Unix socket read/writable by the docker group when the daemon starts.

To add your user (who has root privileges) to docker group run following commands:

1
2
sudo usermod -aG docker $USER
newgrp docker

Verify that you can use it as non-root user by running docker ps.

Sbt-docker plugin

Add sbt-docker as a dependency in project/plugins.sbt:

1
addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0")

We start by defining docker settings in build.sbt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def dockerSettings(debugPort: Option[Int] = None) = Seq(

  dockerfile in docker := {
    val artifactSource: File = assembly.value
    val artifactTargetPath = s"/project/${artifactSource.name}"
    val scriptSourceDir = baseDirectory.value / "../scripts"
    val projectDir = "/project/"

    new Dockerfile {
      from("saumitras01/java:1.8.0_111")
      add(artifactSource, artifactTargetPath)
      copy(scriptSourceDir, projectDir)
      entryPoint(s"/project/start.sh")
      cmd(projectDir, s"${name.value}", s"${version.value}")
    }
  },
  imageNames in docker := Seq(
    ImageName(s"saumitras01/${name.value}:latest")
  )
)

lazy val producer = (project in file("tweet-producer"))
  .enablePlugins(sbtdocker.DockerPlugin)
  .settings(
    libraryDependencies ++= Seq(
      "org.twitter4j" % "twitter4j-core" % "4.0.4",
      "org.twitter4j" % "twitter4j-stream" % "4.0.4",
      "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri")
    ),
    dockerSettings()
  )

lazy val consumer = (project in file("tweet-consumer"))
  .enablePlugins(sbtdocker.DockerPlugin)
  .settings(
    libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
    dockerSettings()
  )

Here’s what it means:

  • Package the application using sbt-assembly to get standalone jar for producer and consumer project as artifactTargetPath
  • Create a new docker image using saumitras01/java:1.8.0_111 as base image
    • Copy the jars in /project dir of image
    • Copy scripts(to start the services) from your local ../script dir to image’s /project/scripts/ dir
    • Execute /project/start.sh as soon as container is started for each service. This script will cause your container to wait until kafka is up. It will then start the consumer/producer service
  • Create the image as saumitras01/saumitras01/tweet-producer-app and saumitras01/saumitras01/tweet-consumer-app and create a latest tag for each of them.

We then use these setting while building both producer and consumer project. You are now ready to create the images for your services. Run sbt docker:

1
2
3
4
sam@sam-ub:kafkatwitterdocker$ sbt
[info] Loading project definition from /home/sam/projects/kafkatwitterdocker/project
[info] Set current project to KafkaTwitterDockerExample (in build file:/home/sam/projects/kafkatwitterdocker/)
> docker

You will see something like below for both tweet-producer and tweet-consumer projects:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[info] Step 1 : FROM saumitras01/java:1.8.0_111
[info]  ---> d53c800d525c
[info] Step 2 : ADD 0/tweet-consumer-app-assembly-1.0.jar /project/tweet-consumer-app-assembly-1.0.jar
[info]  ---> db7146cacb9d
[info] Removing intermediate container 81881e37217f
[info] Step 3 : COPY 1/scripts /project/
[info]  ---> d2969bcdbfc1
[info] Removing intermediate container f7ab67074a2e
[info] Step 4 : ENTRYPOINT /project/start.sh
[info]  ---> Running in 34eb4d7d282c
[info]  ---> bee7490cbfb0
[info] Removing intermediate container 34eb4d7d282c
[info] Step 5 : CMD /project/ tweet-consumer-app 1.0
[info]  ---> Running in 7d16d4e7ef45
[info]  ---> fd8af3ffe7c1
[info] Removing intermediate container 7d16d4e7ef45
[info] Successfully built fd8af3ffe7c1
[info] Tagging image fd8af3ffe7c1 with name: saumitras01/tweet-consumer-app:latest
[success] Total time: 44 s, completed 4 Nov, 2017 11:33:14 PM

Once its done, if you run docker images command, you will see you images listed:

1
2
3
4
5
sam@sam-ub:kafkatwitterdocker$ docker images | grep saumitras01

saumitras01/tweet-consumer-app   latest              fd8af3ffe7c1        2 days ago           659.9 MB
saumitras01/tweet-producer-app   latest              4b87ea6e633c        2 days ago           660.3 MB
saumitras01/java                 1.8.0_111           d53c800d525c        2 days ago           643.2 MB

Docker compose

Now that we have all the images available, we need to create a docker compose config file to define all the services, dependencies and ports.

Before we can start a producer or consumer, we need to make sure that kafka is running. Kafka depends on zookeeper service.

Define zookeeper service config

1
2
3
4
zookeeper:
  image: saumitras01/zookeeper:v3.4.9
  ports:
    - "2181:2181"

This will pull the image saumitras/zookeeper repo and start a container and exposes its 2181 port as zookeeper:2181.

Next we define config for kafka service:

1
2
3
4
5
6
7
8
9
10
11
12
kafka:
  image: saumitras01/kafka:latest
  ports:
    - "9092:9092"
  depends_on:
    - zookeeper
  environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka
    KAFKA_ADVERTISED_PORT: "9092"
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

This will pull tag 0.10.0.0 from saumitras/kafka repo and start kafka on port 9092 as kafka:9092.

Next we define consumer and producer services. Here’s complete content of docker-compose.yml to start 1 producer and 1 consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
version: '2'

services:
  zookeeper:
    image: saumitras01/zookeeper:v3.4.9
    ports:
      - "2181:2181"

  kafka:
    image: saumitras01/kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  tweet-producer:
    image: saumitras01/tweet-producer-app:latest
    ports:
      - "8080:8080"
    tty: true
    depends_on:
      - kafka

  tweet-consumer:
    image: saumitras01/tweet-consumer-app:latest
    ports:
      - "8081:8081"
    tty: true
    depends_on:
      - kafka

If you just want to start one of the service on a given machine, then remove other’s config from you docker-compose.yml defintion.

We are now ready to run our application. Run command:

1
docker-compose up

Your services will start and you should start seeing the output from your services

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Creating kafkatwitterdocker_zookeeper_1 ...
Creating kafkatwitterdocker_zookeeper_1 ... done
Creating kafkatwitterdocker_kafka_1 ...
Creating kafkatwitterdocker_kafka_1 ... done
Creating kafkatwitterdocker_tweet-consumer_1 ...
Creating kafkatwitterdocker_tweet-producer_1 ...
Creating kafkatwitterdocker_tweet-producer_1
Creating kafkatwitterdocker_tweet-consumer_1 ... done
Attaching to kafkatwitterdocker_zookeeper_1, kafkatwitterdocker_kafka_1, kafkatwitterdocker_tweet-producer_1, kafkatwitterdocker_tweet-consumer_1

....

....

tweet-producer_1  | [Producer] RT @loopbak: Outlier Detection with Unsupervised Decision Trees: https://t.co/weUoj3vTcy #abdsc #BigData #DataScience… https://t.co/sYYcb2Z…
tweet-consumer_1  | [Consumer] [TagCount=3] RT @loopbak: Outlier Detection with Unsupervised Decision Trees: https://t.co/weUoj3vTcy #abdsc #BigData #DataScience¬タᆭ https://t.co/sYYcb2Z¬タᆭ
tweet-producer_1  | [Producer] RT @HPI_DE: Von #DataMining bis #DataAnalytics – wie man #BigData sinnvoll auswertet erfahren Sie im kostenlosen #MOOC @openHPI… 
tweet-consumer_1  | [Consumer] [TagCount=4] RT @HPI_DE: Von #DataMining bis #DataAnalytics ¬タモ wie man #BigData sinnvoll auswertet erfahren Sie im kostenlosen #MOOC @openHPI¬タᆭ 
tweet-producer_1  | [Producer] RT @AcadayLabs: The future #blockchain infrastructure Gavin Wood & Jutta Steiner at @disruptberlin https://t.co/PTmjNpg5D0… 
tweet-consumer_1  | [Consumer] [TagCount=1] RT @AcadayLabs: The future #blockchain infrastructure Gavin Wood & Jutta Steiner at @disruptberlin https://t.co/PTmjNpg5D0¬タᆭ 
tweet-producer_1  | [Producer] RT @rajneeshchhabra: 3 Essentials Of Every Digital Journey https://t.co/hi5hJZXmz9 #MachineLearning #IoT #IIoT #AI #ML #DL #BigData #Artifi…
tweet-consumer_1  | [Consumer] [TagCount=8] RT @rajneeshchhabra: 3 Essentials Of Every Digital Journey https://t.co/hi5hJZXmz9 #MachineLearning #IoT #IIoT #AI #ML #DL #BigData #Artifi¬タᆭ

If you run docker ps, you will see different containers started for your services

1
2
3
4
5
6
$ docker ps
CONTAINER ID        IMAGE                                   COMMAND                  CREATED             STATUS              PORTS                                                NAMES
ca5d617191fe        saumitras01/tweet-producer-app:latest   "/project/start.sh /p"   11 seconds ago      Up 8 seconds        0.0.0.0:8080->8080/tcp                               kafkatwitterdocker_tweet-producer_1
9717ceba31f0        saumitras01/tweet-consumer-app:latest   "/project/start.sh /p"   11 seconds ago      Up 8 seconds        0.0.0.0:8081->8081/tcp                               kafkatwitterdocker_tweet-consumer_1
e47522435b83        saumitras01/kafka:latest                "start-kafka.sh"         13 seconds ago      Up 11 seconds       0.0.0.0:9092->9092/tcp                               kafkatwitterdocker_kafka_1
3810212cb549        saumitras01/zookeeper:v3.4.9            "/bin/sh -c '/usr/sbi"   14 seconds ago      Up 12 seconds       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkatwitterdocker_zookeeper_1

There are lot of other things to consider when going into production with docker, but I hope this post gave you an idea of how the workflow will look like.