Skip to content

Search and analytics on streaming data with Kafka, Solr, Cassandra, Spark

In this blog post we will see how to setup a simple search and analytics pipeline on streaming data in scala.

  • For sample time series data, we will use twitter stream.
  • For data pipelining, we will use kafka
  • For search, we will use Solr. We will use Banana for a UI query interface for solr data.
  • For analytics, we will store data in cassandra. We will see example of using spark for running analytics query. We will use zeppelin for a UI query interface.

Full code for this post is available at https://github.com/saumitras/twitter-cass-kafka-solr-spark-demo

Dependencies

Create a new project and add following dependencies in build.sbt. Note that there are few conflicting dependencies in kafka so exclude them:

libraryDependencies ++= Seq(
  "org.twitter4j" % "twitter4j-core" % "4.0.4",
  "org.twitter4j" % "twitter4j-stream" % "4.0.4",
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.17",
  "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
  "org.apache.avro" % "avro" % "1.7.7" withSources(),
  "org.apache.solr" % "solr-solrj" % "6.4.1" withSources(),
  "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
  "ch.qos.logback" % "logback-classic" % "1.1.2",
  "com.datastax.cassandra" % "cassandra-driver-core"  % "3.0.2",
  "org.apache.cassandra" % "cassandra-clientutil"  % "3.0.2",
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "org.apache.spark" %% "spark-sql" % "2.1.0",
  "org.apache.spark" %% "spark-hive" % "2.1.0",
  "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0"
)

Setting up twiiter stream

For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on "Keys and access token" and copy following:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

We will use twitter4j. Build a configuration using token and key

val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true)
cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk")   //replace this with your own keys
cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt") //replace this with your own keys
cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn")  //replace this with your own keys
cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU")  //replace this with your own keys

You can now open a stream and listen for tweets with some specific keyswords or hashtags:

val stream = new TwitterStreamFactory(cb.build()).getInstance()

val listener = new StatusListener {

  override def onTrackLimitationNotice(i: Int): Unit = logger.warn(s"Track limited $i tweets")

  override def onStallWarning(stallWarning: StallWarning): Unit = logger.error("Stream stalled")

  override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = logger.warn("Status ${statusDeletionNotice.getStatusId} deleted")

  override def onScrubGeo(l: Long, l1: Long): Unit = logger.warn(s"Geo info scrubbed. userId:$l, upToStatusId:$l1")

  override def onException(e: Exception): Unit = logger.error("Exception occurred. " + e.getMessage)

  override def onStatus(status: Status): Unit = {

    logger.info("Msg: " + status.getText)

  }

}

val keywords = List("#scala", "#kafka", "#cassandra", "#solr", "#bigdata", "#apachespark", "#streamingdata")

stream.addListener(listener)
val fq = new FilterQuery()
fq.track(keywords.mkString(","))
stream.filter(fq)

StatusListener provide couple of callback to handle different scenarios. onStatus is the one which will get the tweet and its metadata. stream.filter(fq) will start the stream.

If you run this, you should start seeing the tweets:

Msg: RT @botbigdata: How to build a data science team https://t.co/xJWWgueGAV #bigdata
Msg: RT @ATEKAssetScan: Why Velocity Of Innovation Is A Data Friction Problem https://t.co/Eo1pTNCEv9 #BigData #IoT #IIoT #InternetOfThings #Art…
Msg: Making the Most of Big Data https://t.co/X52AZ5n5nT #BigData
Msg: RT @botbigdata: Create editable Microsoft Office charts from R https://t.co/LnSDU0iSMq #bigdata
Msg: RT @YarmolukDan: How #Twitter Users Can Generate Better Ideas https://t.co/b0O9iEULHG #DataScience #DataScientist #BigData #IoT… 
Msg: RT @botbigdata: VIDEO: Installing TOR on an Ubuntu Virtual Machine https://t.co/Q3FPhY8CGm #bigdata

Lets define a type and extract out tweet metadata

case class Tweet(id:String, username:String, userId:Long, userScreenName:String,
                   userDesc:String, userProfileImgUrl:String, favCount:Long, retweetCount:Long,
                   lang:String, place:String, message:String, isSensitive:Boolean,
                   isTruncated:Boolean, isFavorited:Boolean, isRetweeted:Boolean,
                   isRetweet:Boolean, createdAt:Long)
override def onStatus(status: Status): Unit = {

  val retweetCount = if(status.getRetweetedStatus == null) 0 else status.getRetweetedStatus.getRetweetCount
  val userDesc = if(status.getUser.getDescription == null) "null" else status.getUser.getDescription
  val userProfileImgUrl = if(status.getUser.getProfileImageURL == null) "null" else status.getUser.getProfileImageURL
  val lang = if(status.getLang == null) "null" else status.getLang
  val place = if(status.getPlace == null) "null" else status.getPlace.getFullName

  val tweet = Tweet(
    id = status.getId.toString,
    username = status.getUser.getName,
    userId = status.getUser.getId,
    userScreenName = status.getUser.getScreenName,
    userDesc = userDesc,
    userProfileImgUrl = userProfileImgUrl,
    createdAt = status.getCreatedAt.getTime,
    favCount = status.getFavoriteCount,
    retweetCount = retweetCount,
    lang = lang,
    place = place,
    message = status.getText,
    isSensitive = status.isPossiblySensitive,
    isTruncated = status.isTruncated,
    isFavorited = status.isFavorited,
    isRetweeted = status.isRetweeted,
    isRetweet = status.isRetweet
  )
  logger.info("Msg: " + tweet.message)

  }

Next we will send these tweets to kafka.

Zookeeper setup

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. In our example, both Kafka and Solr will need zookeeper for their state and config management, so you need to first start zookeeper.

  • Download it from http://apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
  • Extract it and go inside conf directory
  • Make a copy of zoo_sample.conf as zoo.cfg
  • Run it using bin/zkServer.sh start
  • Verify its started successfully by running bin/zkServer.sh status command.

Putting data in Kafka

Here's steps to send data to kafka.

  • Start kafka server and broker(s)
  • Create a topic in kafka to which data will be send
  • Define a avro schema for the tweets
  • Create a kafka producer which will serialize tweets using avro schema and send it to kafka

Download kafka from here.

Start server

bin/kafka-server-start.sh config/server.properties

Create a topic

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic tweet1

You can see if topic is created successfully

bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka

Avro schema

Avro is a data serialization system. It has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data. Lets define avro schema for our Tweet type:

{
  "type": "record",
  "namespace": "tweet",
  "name": "tweet",
  "fields":[
    { "name": "id", "type":"string" },
    { "name": "username", "type":"string" },
    { "name": "userId", "type":"long" },
    { "name": "userScreenName", "type":"string" },
    { "name": "userDesc", "type":"string" },
    { "name": "userProfileImgUrl", "type":"string" },
    { "name": "favCount", "type":"int" },
    { "name": "retweetCount", "type":"int" },
    { "name": "lang", "type":"string" },
    { "name": "place", "type":"string" },
    { "name": "message", "type":"string" },
    { "name": "isSensitive", "type":"boolean" },
    { "name": "isTruncated", "type":"boolean" },
    { "name": "isFavorited", "type":"boolean" },
    { "name": "isRetweeted", "type":"boolean" },
    { "name": "isRetweet", "type":"boolean" },
    { "name": "createdAt", "type":"long" }
  ]
}

Kafka supports lot of other formats too, but avro is the preferred format for streaming data. You can read more about it here https://www.confluent.io/blog/avro-kafka-data/

Next create a producer

val props = new Properties()
props.put("bootstrap.servers", brokerList)
props.put("client.id", "KafkaTweetProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")

val producer = new KafkaProducer[String, Array[Byte]](props)

Create a Schema using the avro schema definition

val schema = new Parser().parse(Source.fromURL(getClass.getResource("/tweet.avsc")).mkString)

Serialize the tweet and send it to producer

def writeToKafka(tweet: Tweet) = {

  val row = new GenericData.Record(schema)
  row.put("id", tweet.id)
  row.put("username", tweet.username)
  row.put("userId", tweet.userId)
  row.put("userScreenName", tweet.userScreenName)
  row.put("userDesc", tweet.userDesc)
  row.put("userProfileImgUrl", tweet.userProfileImgUrl)
  row.put("favCount", tweet.favCount)
  row.put("retweetCount", tweet.retweetCount)
  row.put("lang", tweet.lang)
  row.put("place", tweet.place)
  row.put("message", tweet.message)
  row.put("isSensitive", tweet.isSensitive)
  row.put("isTruncated", tweet.isTruncated)
  row.put("isFavorited", tweet.isFavorited)
  row.put("isRetweeted", tweet.isRetweeted)
  row.put("isRetweet", tweet.isRetweet)
  row.put("createdAt", tweet.createdAt)

  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder = EncoderFactory.get().binaryEncoder(out, null)
  writer.write(row, encoder)
  encoder.flush()

  logger.info("Pushing to kafka. TweetId= " + tweet.id)

  val data = new ProducerRecord[String, Array[Byte]](topic, out.toByteArray)
  producer.send(data)

}

We will create an ActorSystem and put all this inside a KafkaTweetProducer actor. We will then send a message to KafkaTweetProducer whenever a new tweet is recieved.

val zkHostKafka = "localhost:2181/kafka"
val kafkaBrokers = "localhost:9092"
val topic = "tweet1"

val system = ActorSystem("TwitterAnalysis")

val kafkaProducer = system.actorOf(Props(new KafkaTweetProducer(kafkaBrokers, topic)), name = "kafka_tweet_producer")

val twitterStream = new TwitterWatcher(cb, topics, kafkaProducer)
twitterStream.startTracking()
class TwitterWatcher(cb:ConfigurationBuilder, keywords:List[String], destination:ActorRef) extends Logging {

 override def onStatus(status: Status): Unit = {
    ...

    val tweet = Tweet(
      ...

    )

    destination ! tweet
  }

}
class KafkaTweetProducer(brokerList:String, topic:String) extends Actor with Logging {

  override def receive: Receive = {
    case t:Tweet =>
      writeToKafka(t)

    ...
  }
}

To test whether this data is getting written in kafka properly on not, you can use the command line console consumer and watch for the topic tweet1:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweet1 --from-beginning

Next we will consume this data in solr and cassandra

Putting data in solr

Here's steps for writing data to solr:

  • Define a solr schema(config-set) corresponding to tweet type
  • Upload the schema to zookeeper
  • Create a collection in solr using this config set
  • Create a solr consumer which will read from tweet1 topic from kafka
  • Deserialize the data read from kafka and create solr documents from it
  • Send documents to solr

Here's what the schema definition will look like:

 <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="username" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="userId" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="userScreenName" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="userDesc" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="userProfileImgUrl" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="favCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="retweetCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="lang" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="place" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="message" type="text_en" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="isSensitive" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="isTruncated" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="isFavorited" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="isRetweeted" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="isRetweet" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> 
 <field name="createdAt" type="tdate" indexed="true" stored="true" required="true" multiValued="false" /> 

Upload the configset to solr and create a collection:

./server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir tweet-schema -confname tweet-schema

Create the collection

http://localhost:8983/solr/admin/collections?action=create&name=tweet&collection.configName=tweet-schema&numShards=1

Next create a SolrWriter actor which will receive a Tweet message from a KafkaSolrComsumer (which we will define next), convert it to SolrInputDocument and send it to solr

class SolrWriter(zkHost: String, collection: String, commitAfterBatch: Boolean) extends Actor with Logging {

  val client = new CloudSolrClient.Builder().withZkHost(zkHost).build()
  client.setDefaultCollection(collection)

  ...

  var batch = List[SolrInputDocument]()

  val MAX_BATCH_SIZE = 100

  override def receive: Receive = {
    case doc: Tweet =>

      val solrDoc = new SolrInputDocument()
      solrDoc.setField("id", doc.id)
      solrDoc.setField("username", doc.username)
      ...

      batch = solrDoc :: batch

      if (batch.size > MAX_BATCH_SIZE) indexBatch()

    case FlushBuffer =>
      indexBatch()

    case _ =>
      logger.warn("Unknown message")

  }


  def indexBatch(): Boolean = {
    try {
      logger.info("Flushing batch")
      client.add(batch.asJavaCollection)
      batch = List[SolrInputDocument]()
      if (commitAfterBatch) client.commit()
      true
    } catch {
      case ex: Exception =>
        logger.error(s"Failed to indexing solr batch. Exception is " + ex.getMessage)
        ex.printStackTrace()
        batch = List[SolrInputDocument]()
        false
    }
  }

  ...
}

Now we need to define a kafka consumer which will read data from solr and send it to SolrWriter

Kafka Consumer

Consumer will read data from kafka, deserialize it using avro schema, and convert it to Tweet type and forward the message to a destination actor. We will keep the consumer generic so that any destination actor(solr or cassandra) can be passed to it.

class KafkaTweetConsumer(zkHost:String, groupId:String, topic:String, destination:ActorRef) extends Actor with Logging {
  ...

  def read() = try {
    ...
    destination ! tweet   //destination will be either solr or cassandra
    ...
  }
}

Create consumer and avro schema object

private val props = new Properties()

props.put("group.id", groupId)
props.put("zookeeper.connect", zkHost)
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "120000")
props.put("auto.commit.interval.ms", "10000")

private val consumerConfig = new ConsumerConfig(props)
private val consumerConnector = Consumer.create(consumerConfig)
private val filterSpec = new Whitelist(topic)

val schemaString = Source.fromURL(getClass.getResource("/tweet.avsc")).mkString
val schema = new Schema.Parser().parse(schemaString)

Convert binary data to Tweet type using avro

private def getTweet(message: Array[Byte]): Tweet = {

    val reader = new SpecificDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(message, null)
    val record = reader.read(null, decoder)

    val tweet = Tweet(
      id = record.get("id").toString,
      username = record.get("username").toString,
      ...
    )
    tweet
  }

Start consuming from kafka and send messages to destination, Solr in this specific case.

val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,new DefaultDecoder(), new DefaultDecoder())(0)

lazy val iterator = streams.iterator()

while (iterator.hasNext()) {
  val tweet = getTweet(iterator.next().message())
  //logger.info("Consuming tweet: " + tweet.id)
  destination ! tweet
}

You should now start seeing data in solr:

http://localhost:8983/solr/tweet/select?q=*:*&wt=json&rows=1
{
   "responseHeader":{
      "zkConnected":true,
      "status":0,
      "QTime":1,
      "params":{
         "q":"*:*",
         "rows":"1",
         "wt":"json"
      }
   },
   "response":{
      "numFound":42,
      "start":0,
      "docs":[
         {
            "id":"923302396612182016",
            "username":"Tawanna Kessler",
            "userId":898322458742337536,
            "userScreenName":"tawanna_kessler",
            "userDesc":"null",
            "userProfileImgUrl":"http://pbs.twimg.com/profile_images/898323854417940484/lke3BSjt_normal.jpg",
            "favCount":0,
            "retweetCount":183,
            "lang":"en",
            "place":"null",
            "message":"RT @craigbrownphd: Two upcoming webinars: Two new Microsoft webinars are taking place over the next week that may… https://t.co/SAb9CMmVXY…",
            "isSensitive":false,
            "isTruncated":false,
            "isFavorited":false,
            "isRetweeted":false,
            "isRetweet":true,
            "createdAt":"2017-10-26T03:07:00Z",
            "_version_":1582267022370144256
         }
      ]
   }
}

Querying solr data with banana

Banana is a data visualization tool that uses solr for data analysis and display. It can be run in same container as solr. Here's how to set it up:

Here's how to set it up for our tweet data. We will run it in same container as solr:

Download banana and put it in solr's webapp directory

cd SOLR_HOME/server/solr-webapp/webapp/
git clone https://github.com/lucidworks/banana --depth 1

To save dashboards and setting, banana expects a collection named banana-int. Lets go ahead and create it. Configset for that collection can be obtained found in banana/resources/banana-int-solr-5.0/.

Upload banana config to zookeeper

$SOLR_HOME/server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir banana-int-solr-5.0/conf/ -confname banana

Create the collection

http://localhost:8983/solr/admin/collections?action=create&name=banana-int&collection.configName=banana&numShards=1

Navigate to banana UI at http://localhost:8983/solr/banana/src/index.html and change the collection in settings to point to tweet collection in

Here's what it will look like for our tweets data:

Banana UI

Next we will create a cassandra consumer.

Putting data in cassandra

  • Download cassandra from http://archive.apache.org/dist/cassandra/3.0.12/apache-cassandra-3.0.12-bin.tar.gz and uncompress it
  • Run bin/cassandra to start it

We need to first create a keyspace and table for storing tweets

CREATE KEYSPACE twitter WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

CREATE TABLE twitter.tweet (
  topic text,
  id text,
  username text,
  userId text,
  userScreenName text,
  userDesc text,
  userProfileImgUrl text,
  favCount bigint,
  retweetCount bigint,
  lang text,
  place text,
  message text,
  isSensitive boolean,
  isTruncated boolean,
  isFavorited boolean,
  isRetweeted boolean,
  isRetweet boolean,
  createdAt timestamp,
  creationDate timestamp,

  PRIMARY KEY ((topic, creationDate), username, id)
)

Then we will create a CassWriter actor similar to solr one which will accept a tweet message and write it to cassandra.

Connect to cluster.

lazy val cluster = Cluster.builder().addContactPoint(seeds).build()
lazy val session = cluster.connect(keyspace)

Since we will be using same query repeatedly to insert data with different parameters, hence we will use prepared statement to improve performance:

lazy val prepStmt = session.prepare(s"INSERT INTO $cf (" +
      "topic, id, username, userId, userScreenName, userDesc, userProfileImgUrl, favCount," +
      "retweetCount, lang, place, message, isSensitive, isTruncated, isFavorited, isRetweeted," +
      "isRetweet, createdAt, creationDate" +
      ") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")

Take Tweet, create a BoundStatement by setting values for all fields and write it to cassandra

def writeToCass(t:Tweet) = {

  try {

    val boundStmt = prepStmt.bind()
      .setString("topic", topic)
      .setString("id",t.id)
      .setString("username", t.username)
      .setString("userId", t.userId.toString)
      .setString("userScreenName",t.userScreenName)
      .setString("userDesc",t.userDesc)
      .setString("userProfileImgUrl",t.userProfileImgUrl)
      .setLong("favCount",t.favCount)
      .setLong("retweetCount",t.retweetCount)
      .setString("lang",t.lang)
      .setString("place",t.place)
      .setString("message",t.message)
      .setBool("isSensitive",t.isSensitive)
      .setBool("isTruncated",t.isTruncated)
      .setBool("isFavorited",t.isFavorited)
      .setBool("isRetweeted",t.isRetweeted)
      .setBool("isRetweet",t.isRetweet)
      .setTimestamp("createdAt", new Date(t.createdAt))
      .setTimestamp("creationDate", new Date(t.createdAt))

    session.execute(boundStmt)

  } catch {
    case ex: Exception =>
      logger.error("C* insert exception. Message: " + ex.getMessage)
  }

}

We will create a new instance of this actor

val cassWriter = system.actorOf(Props(new CassWriter(cassSeeds, cassKeyspace, cassCf, topic)), name = "cass_writer")

And then create a new KafkaTweetConsumer whose destination will be this cassWriter actor

val cassConsumer = system.actorOf(Props(
    new KafkaTweetConsumer(zkHostKafka, "tweet-cass-consumer", topic, cassWriter)), name = "cass_consumer")

You should start seeing data in cassandra

cqlsh> select creationdate, userscreenname, lang, message from twitter.tweet limit 1;

 creationdate             | userscreenname | lang | message
--------------------------+----------------+------+--------------------------------------------------------------------------------------------------------------------------------------------
 2017-10-25 21:56:30+0000 |   alevergara78 |   en | RT @HomesAtMetacoda: data in motion >> Online learning: #MachineLearning’s secret for #bigdata via\n@SASsoftware https://t.co/eGbAumJzEt… 

Next we will setup spark and use it to query cassandra data.

Query cassandra data with spark

We will use datastax spark cassandra connector https://github.com/datastax/spark-cassandra-connector. Download the correct connection version jar and place it in lib directory of your project:

First thing which we need is a spark context

val CASS_SEEDS = "127.0.0.1"
val SPARK_MASTER = "spark://sam-ub:7077"

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", CASS_SEEDS)
  .setJars(Seq("lib/spark-cassandra-connector-assembly-2.0.0.jar"))
  .setMaster(SPARK_MASTER)
  .setAppName("cass_query")

lazy val sc = new SparkContext(conf)

Then you can query and apply different aggregations. This query will be picked up as a spark job and executed on you spark cluster:

val data = sc.cassandraTable("twitter", "tweets")
             .select("topic", "creationdate", "retweetcount", "id", "isretweet")
             .where("topic = 'tweets' and creationdate = '2017-10-25 20:15:05+0000'")
             .groupBy(_.getLong("retweetcount"))
             .map(r => (r._1, r._2.size))
             .collect()

logger.info("Count of rows = " + data)

If job is successful, you will see the result:

Count of rows = 38

Visualizing cassandra data with zeppelin

Zeppelin is a web-based notebook that can be used for interactive data analytics on cassandra data using spark.

Download the binary from https://zeppelin.apache.org/download.html and uncompress it. Default port used by it is 8080 which conflicts with spark master web ui port, so change the port in conf/zeppelin-site.xml.

Create a new notebook and select spark interpreter

Create a view of our tweet table from cassandra

%spark.sql

create temporary view mytweets
using org.apache.spark.sql.cassandra
options (keyspace "twitter", table "tweet")

We can now run aggregations or other analytics queries on this view:

%spark.sql

select lang, count(*) as occur from mytweets where lang != 'und' group by lang order by occur desc limit 10

Here's what output of above query will look like:

Zeppelin UI

Conclusion

I hope you got the idea of how to get started with creating a search and analytics pipeline.