Saumitra's blog

Search / Analytics / Distributed Systems / Machine Learning / DSLs

Search and Analytics on Streaming Data With Kafka, Solr, Cassandra, Spark

In this blog post we will see how to setup a simple search and anlytics pipeline on streaming data in scala.

  • For sample timeseries data, we will use twitter stream.
  • For data pipelining, we will use kafka
  • For search, we will use Solr. We will use Banana for a UI query interface for solr data.
  • For analytics, we will store data in cassandra. We will see example of using spark for running analytics query. We will use zeppelin for a UI query interface.

Full code for this post is avaliable at https://github.com/saumitras/twitter-analysis

Dependencies

Create a new project and add following dependecies in build.sbt. Note that there are few conflicting dependecies in kafka so exclude them:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
libraryDependencies ++= Seq(
  "org.twitter4j" % "twitter4j-core" % "4.0.4",
  "org.twitter4j" % "twitter4j-stream" % "4.0.4",
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.17",
  "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
  "org.apache.avro" % "avro" % "1.7.7" withSources(),
  "org.apache.solr" % "solr-solrj" % "6.4.1" withSources(),
  "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
  "ch.qos.logback" % "logback-classic" % "1.1.2",
  "com.datastax.cassandra" % "cassandra-driver-core"  % "3.0.2",
  "org.apache.cassandra" % "cassandra-clientutil"  % "3.0.2",
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "org.apache.spark" %% "spark-sql" % "2.1.0",
  "org.apache.spark" %% "spark-hive" % "2.1.0",
  "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0"
)

Setting up twiiter stream

For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on “Keys and access token” and copy following:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

We will use twitter4j. Build a configuration using token and key

1
2
3
4
5
6
val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true)
cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk")   //replace this with your own keys
cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt") //replace this with your own keys
cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn")  //replace this with your own keys
cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU")  //replace this with your own keys

You can now open a stream and listen for tweets with some specific keyswords or hashtags:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val stream = new TwitterStreamFactory(cb.build()).getInstance()

val listener = new StatusListener {

  override def onTrackLimitationNotice(i: Int): Unit = logger.warn(s"Track limited $i tweets")

  override def onStallWarning(stallWarning: StallWarning): Unit = logger.error("Stream stalled")

  override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = logger.warn("Status ${statusDeletionNotice.getStatusId} deleted")

  override def onScrubGeo(l: Long, l1: Long): Unit = logger.warn(s"Geo info scrubbed. userId:$l, upToStatusId:$l1")

  override def onException(e: Exception): Unit = logger.error("Exception occurred. " + e.getMessage)

  override def onStatus(status: Status): Unit = {

    logger.info("Msg: " + status.getText)

  }

}

val keywords = List("#scala", "#kafka", "#cassandra", "#solr", "#bigdata", "#apachespark", "#streamingdata")

stream.addListener(listener)
val fq = new FilterQuery()
fq.track(keywords.mkString(","))
stream.filter(fq)

StatusListener provide couple of callback to handle different scenarios. onStatus is the one which will get the tweet and its metadata. stream.filter(fq) will start the stream.

If you run this, you should start seeing the tweets:

1
2
3
4
5
6
Msg: RT @botbigdata: How to build a data science team https://t.co/xJWWgueGAV #bigdata
Msg: RT @ATEKAssetScan: Why Velocity Of Innovation Is A Data Friction Problem https://t.co/Eo1pTNCEv9 #BigData #IoT #IIoT #InternetOfThings #Art…
Msg: Making the Most of Big Data https://t.co/X52AZ5n5nT #BigData
Msg: RT @botbigdata: Create editable Microsoft Office charts from R https://t.co/LnSDU0iSMq #bigdata
Msg: RT @YarmolukDan: How #Twitter Users Can Generate Better Ideas https://t.co/b0O9iEULHG #DataScience #DataScientist #BigData #IoT… 
Msg: RT @botbigdata: VIDEO: Installing TOR on an Ubuntu Virtual Machine https://t.co/Q3FPhY8CGm #bigdata

Lets define a type and extract out tweet metadata

1
2
3
4
5
case class Tweet(id:String, username:String, userId:Long, userScreenName:String,
                   userDesc:String, userProfileImgUrl:String, favCount:Long, retweetCount:Long,
                   lang:String, place:String, message:String, isSensitive:Boolean,
                   isTruncated:Boolean, isFavorited:Boolean, isRetweeted:Boolean,
                   isRetweet:Boolean, createdAt:Long)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
override def onStatus(status: Status): Unit = {

  val retweetCount = if(status.getRetweetedStatus == null) 0 else status.getRetweetedStatus.getRetweetCount
  val userDesc = if(status.getUser.getDescription == null) "null" else status.getUser.getDescription
  val userProfileImgUrl = if(status.getUser.getProfileImageURL == null) "null" else status.getUser.getProfileImageURL
  val lang = if(status.getLang == null) "null" else status.getLang
  val place = if(status.getPlace == null) "null" else status.getPlace.getFullName

  val tweet = Tweet(
    id = status.getId.toString,
    username = status.getUser.getName,
    userId = status.getUser.getId,
    userScreenName = status.getUser.getScreenName,
    userDesc = userDesc,
    userProfileImgUrl = userProfileImgUrl,
    createdAt = status.getCreatedAt.getTime,
    favCount = status.getFavoriteCount,
    retweetCount = retweetCount,
    lang = lang,
    place = place,
    message = status.getText,
    isSensitive = status.isPossiblySensitive,
    isTruncated = status.isTruncated,
    isFavorited = status.isFavorited,
    isRetweeted = status.isRetweeted,
    isRetweet = status.isRetweet
  )
  logger.info("Msg: " + tweet.message)

  }

Next we will send these tweets to kafka.

Zookeeper setup

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. In our example, both Kafka and Solr will need zookeeper for their state and config management, so you need to first start zookeeper.

  • Download it from http://apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
  • Extract it and go inside conf directory
  • Make a copy of zoo_sample.conf as zoo.cfg
  • Run it using bin/zkServer.sh start
  • Verify its started successfully by running bin/zkServer.sh status command.

Putting data in Kafka

Here’s steps to send data to kafka.

  • Start kafka server and broker(s)
  • Create a topic in kafka to which data will be send
  • Define a avro schema for the tweets
  • Create a kafka producer which will serialize tweets using avro schema and send it to kafka

Download kafka from here.

Start server

1
bin/kafka-server-start.sh config/server.properties

Create a topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic tweet1

You can see if topic is created successfully

1
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka

Avro schema

Avro is a data serialization system. It has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data. Lets define avro schema for our Tweet type:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
  "type": "record",
  "namespace": "tweet",
  "name": "tweet",
  "fields":[
    { "name": "id", "type":"string" },
    { "name": "username", "type":"string" },
    { "name": "userId", "type":"long" },
    { "name": "userScreenName", "type":"string" },
    { "name": "userDesc", "type":"string" },
    { "name": "userProfileImgUrl", "type":"string" },
    { "name": "favCount", "type":"int" },
    { "name": "retweetCount", "type":"int" },
    { "name": "lang", "type":"string" },
    { "name": "place", "type":"string" },
    { "name": "message", "type":"string" },
    { "name": "isSensitive", "type":"boolean" },
    { "name": "isTruncated", "type":"boolean" },
    { "name": "isFavorited", "type":"boolean" },
    { "name": "isRetweeted", "type":"boolean" },
    { "name": "isRetweet", "type":"boolean" },
    { "name": "createdAt", "type":"long" }
  ]
}

Kafka supports lot of other formats too, but avro is the preferred format for streaming data. You can read more about it here https://www.confluent.io/blog/avro-kafka-data/

Next create a producer

1
2
3
4
5
6
7
val props = new Properties()
props.put("bootstrap.servers", brokerList)
props.put("client.id", "KafkaTweetProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")

val producer = new KafkaProducer[String, Array[Byte]](props)

Creata a Schema using the avro schema definition

1
val schema = new Parser().parse(Source.fromURL(getClass.getResource("/tweet.avsc")).mkString)

Serialize the tweet and send it to producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def writeToKafka(tweet: Tweet) = {

  val row = new GenericData.Record(schema)
  row.put("id", tweet.id)
  row.put("username", tweet.username)
  row.put("userId", tweet.userId)
  row.put("userScreenName", tweet.userScreenName)
  row.put("userDesc", tweet.userDesc)
  row.put("userProfileImgUrl", tweet.userProfileImgUrl)
  row.put("favCount", tweet.favCount)
  row.put("retweetCount", tweet.retweetCount)
  row.put("lang", tweet.lang)
  row.put("place", tweet.place)
  row.put("message", tweet.message)
  row.put("isSensitive", tweet.isSensitive)
  row.put("isTruncated", tweet.isTruncated)
  row.put("isFavorited", tweet.isFavorited)
  row.put("isRetweeted", tweet.isRetweeted)
  row.put("isRetweet", tweet.isRetweet)
  row.put("createdAt", tweet.createdAt)

  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get().binaryEncoder(out, null)
  writer.write(row, encoder)
  encoder.flush()

  logger.info("Pushing to kafka. TweetId= " + tweet.id)

  val data = new ProducerRecord[String, Array[Byte]](topic, out.toByteArray)
  producer.send(data)

}

We will create an ActorSystem and put all this inside a KafkaTweetProducer actor. We will then send a message to KafkaTweetProducer whenever a new tweet is recieved.

1
2
3
4
5
6
7
8
9
10
val zkHostKafka = "localhost:2181/kafka"
val kafkaBrokers = "localhost:9092"
val topic = "tweet1"

val system = ActorSystem("TwitterAnalysis")

val kafkaProducer = system.actorOf(Props(new KafkaTweetProducer(kafkaBrokers, topic)), name = "kafka_tweet_producer")

val twitterStream = new TwitterWatcher(cb, topics, kafkaProducer)
twitterStream.startTracking()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TwitterWatcher(cb:ConfigurationBuilder, keywords:List[String], destination:ActorRef) extends Logging {

 override def onStatus(status: Status): Unit = {
    ...

    val tweet = Tweet(
      ...

    )

    destination ! tweet
  }

}
1
2
3
4
5
6
7
8
9
class KafkaTweetProducer(brokerList:String, topic:String) extends Actor with Logging {

  override def receive: Receive = {
    case t:Tweet =>
      writeToKafka(t)

    ...
  }
}

To test whether this data is getting written in kafka properly on not, you can use the command line console consumer and watch for the topic tweet1:

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweet1 --from-beginning

Next we will consume this data in solr and cassandra

Putting data in solr

Here’s steps for writing data to solr:

  • Define a solr schema(config-set) corresponding to tweet type
  • Upload the schmea to zookeeper
  • Creata a collection in solr using this config set
  • Create a solr consumer which will read from tweet1 topic from kafka
  • Deserialize the data read from kafka and create solr documents from it
  • Send documents to solr

Here’s what the shema definition will look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="username" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="userId" type="tlong" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="userScreenName" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="userDesc" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="userProfileImgUrl" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="favCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="retweetCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="lang" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="place" type="string" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="message" type="text_en" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="isSensitive" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="isTruncated" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="isFavorited" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="isRetweeted" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="isRetweet" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
 <field name="createdAt" type="tdate" indexed="true" stored="true" required="true" multiValued="false" />

Upload the configset to solr and create a collection:

1
./server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir tweet-schema -confname tweet-schema

Create the collection

1
http://localhost:8983/solr/admin/collections?action=create&name=tweet&collection.configName=tweet-schema&numShards=1

Next create a SolrWriter actor which will recieve a Tweet message from a KafkaSolrComsumer (which we will define next), convert it to SolrInputDocument and send it to solr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class SolrWriter(zkHost: String, collection: String, commitAfterBatch: Boolean) extends Actor with Logging {

  val client = new CloudSolrClient.Builder().withZkHost(zkHost).build()
  client.setDefaultCollection(collection)

  ...

  var batch = List[SolrInputDocument]()

  val MAX_BATCH_SIZE = 100

  override def receive: Receive = {
    case doc: Tweet =>

      val solrDoc = new SolrInputDocument()
      solrDoc.setField("id", doc.id)
      solrDoc.setField("username", doc.username)
      ...

      batch = solrDoc :: batch

      if (batch.size > MAX_BATCH_SIZE) indexBatch()

    case FlushBuffer =>
      indexBatch()

    case _ =>
      logger.warn("Unknown message")

  }


  def indexBatch(): Boolean = {
    try {
      logger.info("Flushing batch")
      client.add(batch.asJavaCollection)
      batch = List[SolrInputDocument]()
      if (commitAfterBatch) client.commit()
      true
    } catch {
      case ex: Exception =>
        logger.error(s"Failed to indexing solr batch. Exception is " + ex.getMessage)
        ex.printStackTrace()
        batch = List[SolrInputDocument]()
        false
    }
  }

  ...
}

Now we need to define a kafka consumer which will read data from solr and send it to SolrWriter

Kafka Consumer

Consumer will read data from kafka, deserialize it using avro schema, and convert it to Tweet type and forward the message to a destination actor. We will keep the consumer generic so that any destination actor(solr or cassandra) can be passed to it.

1
2
3
4
5
6
7
8
9
class KafkaTweetConsumer(zkHost:String, groupId:String, topic:String, destination:ActorRef) extends Actor with Logging {
  ...

  def read() = try {
    ...
    destination ! tweet   //destination will be either solr or cassandra
    ...
  }
}

Create consumer and avro schema object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private val props = new Properties()

props.put("group.id", groupId)
props.put("zookeeper.connect", zkHost)
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "120000")
props.put("auto.commit.interval.ms", "10000")

private val consumerConfig = new ConsumerConfig(props)
private val consumerConnector = Consumer.create(consumerConfig)
private val filterSpec = new Whitelist(topic)

val schemaString = Source.fromURL(getClass.getResource("/tweet.avsc")).mkString
val schema = new Schema.Parser().parse(schemaString)

Convert binary data to Tweet type using avro

1
2
3
4
5
6
7
8
9
10
11
12
13
private def getTweet(message: Array[Byte]): Tweet = {

    val reader = new SpecificDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(message, null)
    val record = reader.read(null, decoder)

    val tweet = Tweet(
      id = record.get("id").toString,
      username = record.get("username").toString,
      ...
    )
    tweet
  }

Start consuming from kafka and send messages to destination, Solr in this specific case.

1
2
3
4
5
6
7
8
9
val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,new DefaultDecoder(), new DefaultDecoder())(0)

lazy val iterator = streams.iterator()

while (iterator.hasNext()) {
  val tweet = getTweet(iterator.next().message())
  //logger.info("Consuming tweet: " + tweet.id)
  destination ! tweet
}

You shoud now start seeing data in solr:

1
http://localhost:8983/solr/tweet/select?q=*:*&wt=json&rows=1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
   "responseHeader":{
      "zkConnected":true,
      "status":0,
      "QTime":1,
      "params":{
         "q":"*:*",
         "rows":"1",
         "wt":"json"
      }
   },
   "response":{
      "numFound":42,
      "start":0,
      "docs":[
         {
            "id":"923302396612182016",
            "username":"Tawanna Kessler",
            "userId":898322458742337536,
            "userScreenName":"tawanna_kessler",
            "userDesc":"null",
            "userProfileImgUrl":"http://pbs.twimg.com/profile_images/898323854417940484/lke3BSjt_normal.jpg",
            "favCount":0,
            "retweetCount":183,
            "lang":"en",
            "place":"null",
            "message":"RT @craigbrownphd: Two upcoming webinars: Two new Microsoft webinars are taking place over the next week that may… https://t.co/SAb9CMmVXY…",
            "isSensitive":false,
            "isTruncated":false,
            "isFavorited":false,
            "isRetweeted":false,
            "isRetweet":true,
            "createdAt":"2017-10-26T03:07:00Z",
            "_version_":1582267022370144256
         }
      ]
   }
}

Querying solr data with banana

Banana is a data visualization tool that uses solr for data analysis and display. It can be run in same container as solr. Here’s how to set it up:

Here’s how to set it up for our tweet data. We will run it in same container as solr:

Download banana and put it in solr’s webapp direcory

1
2
cd SOLR_HOME/server/solr-webapp/webapp/
git clone https://github.com/lucidworks/banana --depth 1

To save dashboards and setting, banana expects a collection named banana-int. Lets go ahead and create it. Configset for that collection can be obtained found in banana/resources/banana-int-solr-5.0/.

Upload banana config to zookeeper

1
$SOLR_HOME/server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir banana-int-solr-5.0/conf/ -confname banana

Create the collection

1
http://localhost:8983/solr/admin/collections?action=create&name=banana-int&collection.configName=banana&numShards=1

Navigate to banana UI at http://localhost:8983/solr/banana/src/index.html and change the collection in settings to point to tweet collection in

Here’s what it will look like for our tweets data:

Next we will create a cassandra consumer.

Putting data in cassandra

We need to first create a keyspace and table for storing tweets

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE KEYSPACE twitter WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

CREATE TABLE twitter.tweet (
  topic text,
  id text,
  username text,
  userId text,
  userScreenName text,
  userDesc text,
  userProfileImgUrl text,
  favCount bigint,
  retweetCount bigint,
  lang text,
  place text,
  message text,
  isSensitive boolean,
  isTruncated boolean,
  isFavorited boolean,
  isRetweeted boolean,
  isRetweet boolean,
  createdAt timestamp,
  creationDate timestamp,

  PRIMARY KEY ((topic, creationDate), username, id)
)

Then we will create a CassWriter actor similar to solr one which will accept a tweet message and write it to cassandra.

Connect to cluster.

1
2
lazy val cluster = Cluster.builder().addContactPoint(seeds).build()
lazy val session = cluster.connect(keyspace)

Since we will be using same query repeatedly to insert data with different parameters, hence we will use prepared statement to improve performance:

1
2
3
4
5
lazy val prepStmt = session.prepare(s"INSERT INTO $cf (" +
      "topic, id, username, userId, userScreenName, userDesc, userProfileImgUrl, favCount," +
      "retweetCount, lang, place, message, isSensitive, isTruncated, isFavorited, isRetweeted," +
      "isRetweet, createdAt, creationDate" +
      ") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")

Take Tweet, create a BoundStatement by setting values for all fields and write it to cassandra

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def writeToCass(t:Tweet) = {

  try {

    val boundStmt = prepStmt.bind()
      .setString("topic", topic)
      .setString("id",t.id)
      .setString("username", t.username)
      .setString("userId", t.userId.toString)
      .setString("userScreenName",t.userScreenName)
      .setString("userDesc",t.userDesc)
      .setString("userProfileImgUrl",t.userProfileImgUrl)
      .setLong("favCount",t.favCount)
      .setLong("retweetCount",t.retweetCount)
      .setString("lang",t.lang)
      .setString("place",t.place)
      .setString("message",t.message)
      .setBool("isSensitive",t.isSensitive)
      .setBool("isTruncated",t.isTruncated)
      .setBool("isFavorited",t.isFavorited)
      .setBool("isRetweeted",t.isRetweeted)
      .setBool("isRetweet",t.isRetweet)
      .setTimestamp("createdAt", new Date(t.createdAt))
      .setTimestamp("creationDate", new Date(t.createdAt))

    session.execute(boundStmt)

  } catch {
    case ex: Exception =>
      logger.error("C* insert exception. Message: " + ex.getMessage)
  }

}

We will create a new instance of this actor

1
val cassWriter = system.actorOf(Props(new CassWriter(cassSeeds, cassKeyspace, cassCf, topic)), name = "cass_writer")

And then create a new KafkaTweetConsumer whose destination will be this cassWriter actor

1
2
val cassConsumer = system.actorOf(Props(
    new KafkaTweetConsumer(zkHostKafka, "tweet-cass-consumer", topic, cassWriter)), name = "cass_consumer")

You should start seeing data in cassandra

1
2
3
4
5
cqlsh> select creationdate, userscreenname, lang, message from twitter.tweet limit 1;

 creationdate             | userscreenname | lang | message
--------------------------+----------------+------+--------------------------------------------------------------------------------------------------------------------------------------------
 2017-10-25 21:56:30+0000 |   alevergara78 |   en | RT @HomesAtMetacoda: data in motion >> Online learning: #MachineLearnings secret for #bigdata via\n@SASsoftware https://t.co/eGbAumJzEt

Next we will setup spark and use it to query cassandra data.

Query cassandra data with spark

We will use datastax spark cassandra connector https://github.com/datastax/spark-cassandra-connector. Download the correct connection version jar and place it in lib directory of your project:

First thing which we need is a spark context

1
2
3
4
5
6
7
8
9
10
val CASS_SEEDS = "127.0.0.1"
val SPARK_MASTER = "spark://sam-ub:7077"

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", CASS_SEEDS)
  .setJars(Seq("lib/spark-cassandra-connector-assembly-2.0.0.jar"))
  .setMaster(SPARK_MASTER)
  .setAppName("cass_query")

lazy val sc = new SparkContext(conf)

Then you can query and apply different aggregrations. This query will be picked up as a spark job and exectuted on you spark cluster:

1
2
3
4
5
6
7
8
val data = sc.cassandraTable("twitter", "tweets")
             .select("topic", "creationdate", "retweetcount", "id", "isretweet")
             .where("topic = 'tweets' and creationdate = '2017-10-25 20:15:05+0000'")
             .groupBy(_.getLong("retweetcount"))
             .map(r => (r._1, r._2.size))
             .collect()

logger.info("Count of rows = " + data)

If job is successfull, you will see the result:

1
Count of rows = 38

Visulizing cassandra data with zeppelin

Zeppelin is a web-based notebook that can be used for interactive data analytics on cassandra data using spark.

Download the binary from https://zeppelin.apache.org/download.html and uncompress it. Default port used by it is 8080 which conflicts with spark master web ui port, so change the port in conf/zeppelin-site.xml.

Create a new notebook and select spark interpreter

Create a view of our tweet table from cassandra

1
2
3
4
5
%spark.sql

create temporary view mytweets
using org.apache.spark.sql.cassandra
options (keyspace "twitter", table "tweet")

We can now run aggregations or other analytics queries on this view:

1
2
3
%spark.sql

select lang, count(*) as occur from mytweets where lang != 'und' group by lang order by occur desc limit 10

Here’s what output of above query will look like:

Conclusion

I hope you got the idea of how to get started with creating a search and analytics pipeline.