Search and analytics on streaming data with Kafka, Solr, Cassandra, Spark
In this blog post we will see how to setup a simple search and analytics pipeline on streaming data in scala.
- For sample time series data, we will use twitter stream.
- For data pipelining, we will use kafka
- For search, we will use Solr. We will use Banana for a UI query interface for solr data.
- For analytics, we will store data in cassandra. We will see example of using spark for running analytics query. We will use zeppelin for a UI query interface.
Full code for this post is available at https://github.com/saumitras/twitter-cass-kafka-solr-spark-demo
Dependencies
Create a new project and add following dependencies in build.sbt
. Note that there are few conflicting dependencies in kafka so exclude them:
libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-core" % "4.0.4",
"org.twitter4j" % "twitter4j-stream" % "4.0.4",
"com.typesafe.akka" % "akka-actor_2.11" % "2.4.17",
"org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
"org.apache.avro" % "avro" % "1.7.7" withSources(),
"org.apache.solr" % "solr-solrj" % "6.4.1" withSources(),
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
"ch.qos.logback" % "logback-classic" % "1.1.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "3.0.2",
"org.apache.cassandra" % "cassandra-clientutil" % "3.0.2",
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-hive" % "2.1.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0"
)
Setting up twiiter stream
For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on "Keys and access token" and copy following:
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
We will use twitter4j. Build a configuration using token and key
val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true)
cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk") //replace this with your own keys
cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt") //replace this with your own keys
cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn") //replace this with your own keys
cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU") //replace this with your own keys
You can now open a stream and listen for tweets with some specific keyswords or hashtags:
val stream = new TwitterStreamFactory(cb.build()).getInstance()
val listener = new StatusListener {
override def onTrackLimitationNotice(i: Int): Unit = logger.warn(s"Track limited $i tweets")
override def onStallWarning(stallWarning: StallWarning): Unit = logger.error("Stream stalled")
override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = logger.warn("Status ${statusDeletionNotice.getStatusId} deleted")
override def onScrubGeo(l: Long, l1: Long): Unit = logger.warn(s"Geo info scrubbed. userId:$l, upToStatusId:$l1")
override def onException(e: Exception): Unit = logger.error("Exception occurred. " + e.getMessage)
override def onStatus(status: Status): Unit = {
logger.info("Msg: " + status.getText)
}
}
val keywords = List("#scala", "#kafka", "#cassandra", "#solr", "#bigdata", "#apachespark", "#streamingdata")
stream.addListener(listener)
val fq = new FilterQuery()
fq.track(keywords.mkString(","))
stream.filter(fq)
StatusListener
provide couple of callback to handle different scenarios. onStatus
is the one which will get the tweet
and its metadata. stream.filter(fq)
will start the stream.
If you run this, you should start seeing the tweets:
Msg: RT @botbigdata: How to build a data science team https://t.co/xJWWgueGAV #bigdata
Msg: RT @ATEKAssetScan: Why Velocity Of Innovation Is A Data Friction Problem https://t.co/Eo1pTNCEv9 #BigData #IoT #IIoT #InternetOfThings #Art…
Msg: Making the Most of Big Data https://t.co/X52AZ5n5nT #BigData
Msg: RT @botbigdata: Create editable Microsoft Office charts from R https://t.co/LnSDU0iSMq #bigdata
Msg: RT @YarmolukDan: How #Twitter Users Can Generate Better Ideas https://t.co/b0O9iEULHG #DataScience #DataScientist #BigData #IoT…
Msg: RT @botbigdata: VIDEO: Installing TOR on an Ubuntu Virtual Machine https://t.co/Q3FPhY8CGm #bigdata
Lets define a type and extract out tweet metadata
case class Tweet(id:String, username:String, userId:Long, userScreenName:String,
userDesc:String, userProfileImgUrl:String, favCount:Long, retweetCount:Long,
lang:String, place:String, message:String, isSensitive:Boolean,
isTruncated:Boolean, isFavorited:Boolean, isRetweeted:Boolean,
isRetweet:Boolean, createdAt:Long)
override def onStatus(status: Status): Unit = {
val retweetCount = if(status.getRetweetedStatus == null) 0 else status.getRetweetedStatus.getRetweetCount
val userDesc = if(status.getUser.getDescription == null) "null" else status.getUser.getDescription
val userProfileImgUrl = if(status.getUser.getProfileImageURL == null) "null" else status.getUser.getProfileImageURL
val lang = if(status.getLang == null) "null" else status.getLang
val place = if(status.getPlace == null) "null" else status.getPlace.getFullName
val tweet = Tweet(
id = status.getId.toString,
username = status.getUser.getName,
userId = status.getUser.getId,
userScreenName = status.getUser.getScreenName,
userDesc = userDesc,
userProfileImgUrl = userProfileImgUrl,
createdAt = status.getCreatedAt.getTime,
favCount = status.getFavoriteCount,
retweetCount = retweetCount,
lang = lang,
place = place,
message = status.getText,
isSensitive = status.isPossiblySensitive,
isTruncated = status.isTruncated,
isFavorited = status.isFavorited,
isRetweeted = status.isRetweeted,
isRetweet = status.isRetweet
)
logger.info("Msg: " + tweet.message)
}
Next we will send these tweets to kafka.
Zookeeper setup
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. In our example, both Kafka and Solr will need zookeeper for their state and config management, so you need to first start zookeeper.
- Download it from
http://apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
- Extract it and go inside conf directory
- Make a copy of zoo_sample.conf as zoo.cfg
- Run it using
bin/zkServer.sh start
- Verify its started successfully by running
bin/zkServer.sh status
command.
Putting data in Kafka
Here's steps to send data to kafka.
- Start kafka server and broker(s)
- Create a topic in kafka to which data will be send
- Define a avro schema for the tweets
- Create a kafka producer which will serialize tweets using avro schema and send it to kafka
Download kafka from here.
Start server
Create a topic
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic tweet1
You can see if topic is created successfully
Avro schema
Avro is a data serialization system. It has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data. Lets define avro schema for our Tweet
type:
{
"type": "record",
"namespace": "tweet",
"name": "tweet",
"fields":[
{ "name": "id", "type":"string" },
{ "name": "username", "type":"string" },
{ "name": "userId", "type":"long" },
{ "name": "userScreenName", "type":"string" },
{ "name": "userDesc", "type":"string" },
{ "name": "userProfileImgUrl", "type":"string" },
{ "name": "favCount", "type":"int" },
{ "name": "retweetCount", "type":"int" },
{ "name": "lang", "type":"string" },
{ "name": "place", "type":"string" },
{ "name": "message", "type":"string" },
{ "name": "isSensitive", "type":"boolean" },
{ "name": "isTruncated", "type":"boolean" },
{ "name": "isFavorited", "type":"boolean" },
{ "name": "isRetweeted", "type":"boolean" },
{ "name": "isRetweet", "type":"boolean" },
{ "name": "createdAt", "type":"long" }
]
}
Kafka supports lot of other formats too, but avro is the preferred format for streaming data. You can read more about it here https://www.confluent.io/blog/avro-kafka-data/
Next create a producer
val props = new Properties()
props.put("bootstrap.servers", brokerList)
props.put("client.id", "KafkaTweetProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[String, Array[Byte]](props)
Create a Schema
using the avro schema definition
Serialize the tweet and send it to producer
def writeToKafka(tweet: Tweet) = {
val row = new GenericData.Record(schema)
row.put("id", tweet.id)
row.put("username", tweet.username)
row.put("userId", tweet.userId)
row.put("userScreenName", tweet.userScreenName)
row.put("userDesc", tweet.userDesc)
row.put("userProfileImgUrl", tweet.userProfileImgUrl)
row.put("favCount", tweet.favCount)
row.put("retweetCount", tweet.retweetCount)
row.put("lang", tweet.lang)
row.put("place", tweet.place)
row.put("message", tweet.message)
row.put("isSensitive", tweet.isSensitive)
row.put("isTruncated", tweet.isTruncated)
row.put("isFavorited", tweet.isFavorited)
row.put("isRetweeted", tweet.isRetweeted)
row.put("isRetweet", tweet.isRetweet)
row.put("createdAt", tweet.createdAt)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(row, encoder)
encoder.flush()
logger.info("Pushing to kafka. TweetId= " + tweet.id)
val data = new ProducerRecord[String, Array[Byte]](topic, out.toByteArray)
producer.send(data)
}
We will create an ActorSystem and put all this inside a KafkaTweetProducer
actor. We will then send a message to KafkaTweetProducer
whenever a new tweet is recieved.
val zkHostKafka = "localhost:2181/kafka"
val kafkaBrokers = "localhost:9092"
val topic = "tweet1"
val system = ActorSystem("TwitterAnalysis")
val kafkaProducer = system.actorOf(Props(new KafkaTweetProducer(kafkaBrokers, topic)), name = "kafka_tweet_producer")
val twitterStream = new TwitterWatcher(cb, topics, kafkaProducer)
twitterStream.startTracking()
class TwitterWatcher(cb:ConfigurationBuilder, keywords:List[String], destination:ActorRef) extends Logging {
override def onStatus(status: Status): Unit = {
...
val tweet = Tweet(
...
)
destination ! tweet
}
}
class KafkaTweetProducer(brokerList:String, topic:String) extends Actor with Logging {
override def receive: Receive = {
case t:Tweet =>
writeToKafka(t)
...
}
}
To test whether this data is getting written in kafka properly on not, you can use the command line console consumer and watch for the topic tweet1
:
Next we will consume this data in solr and cassandra
Putting data in solr
Here's steps for writing data to solr:
- Define a solr schema(config-set) corresponding to tweet type
- Upload the schema to zookeeper
- Create a collection in solr using this config set
- Create a solr consumer which will read from
tweet1
topic from kafka - Deserialize the data read from kafka and create solr documents from it
- Send documents to solr
Here's what the schema definition will look like:
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="username" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="userId" type="tlong" indexed="true" stored="true" required="true" multiValued="false" />
<field name="userScreenName" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="userDesc" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="userProfileImgUrl" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="favCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" />
<field name="retweetCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" />
<field name="lang" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="place" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="message" type="text_en" indexed="true" stored="true" required="true" multiValued="false" />
<field name="isSensitive" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
<field name="isTruncated" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
<field name="isFavorited" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
<field name="isRetweeted" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
<field name="isRetweet" type="boolean" indexed="true" stored="true" required="true" multiValued="false" />
<field name="createdAt" type="tdate" indexed="true" stored="true" required="true" multiValued="false" />
Upload the configset to solr and create a collection:
./server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir tweet-schema -confname tweet-schema
Create the collection
http://localhost:8983/solr/admin/collections?action=create&name=tweet&collection.configName=tweet-schema&numShards=1
Next create a SolrWriter
actor which will receive a Tweet
message from a KafkaSolrComsumer
(which we will define next), convert it to SolrInputDocument
and send it to solr
class SolrWriter(zkHost: String, collection: String, commitAfterBatch: Boolean) extends Actor with Logging {
val client = new CloudSolrClient.Builder().withZkHost(zkHost).build()
client.setDefaultCollection(collection)
...
var batch = List[SolrInputDocument]()
val MAX_BATCH_SIZE = 100
override def receive: Receive = {
case doc: Tweet =>
val solrDoc = new SolrInputDocument()
solrDoc.setField("id", doc.id)
solrDoc.setField("username", doc.username)
...
batch = solrDoc :: batch
if (batch.size > MAX_BATCH_SIZE) indexBatch()
case FlushBuffer =>
indexBatch()
case _ =>
logger.warn("Unknown message")
}
def indexBatch(): Boolean = {
try {
logger.info("Flushing batch")
client.add(batch.asJavaCollection)
batch = List[SolrInputDocument]()
if (commitAfterBatch) client.commit()
true
} catch {
case ex: Exception =>
logger.error(s"Failed to indexing solr batch. Exception is " + ex.getMessage)
ex.printStackTrace()
batch = List[SolrInputDocument]()
false
}
}
...
}
Now we need to define a kafka consumer which will read data from solr and send it to SolrWriter
Kafka Consumer
Consumer will read data from kafka, deserialize it using avro schema, and convert it to Tweet
type and forward the message to a destination actor. We will keep the consumer generic so that any destination actor(solr or cassandra) can be passed to it.
class KafkaTweetConsumer(zkHost:String, groupId:String, topic:String, destination:ActorRef) extends Actor with Logging {
...
def read() = try {
...
destination ! tweet //destination will be either solr or cassandra
...
}
}
Create consumer and avro schema object
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zkHost)
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "120000")
props.put("auto.commit.interval.ms", "10000")
private val consumerConfig = new ConsumerConfig(props)
private val consumerConnector = Consumer.create(consumerConfig)
private val filterSpec = new Whitelist(topic)
val schemaString = Source.fromURL(getClass.getResource("/tweet.avsc")).mkString
val schema = new Schema.Parser().parse(schemaString)
Convert binary data to Tweet
type using avro
private def getTweet(message: Array[Byte]): Tweet = {
val reader = new SpecificDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
val record = reader.read(null, decoder)
val tweet = Tweet(
id = record.get("id").toString,
username = record.get("username").toString,
...
)
tweet
}
Start consuming from kafka and send messages to destination, Solr in this specific case.
val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
while (iterator.hasNext()) {
val tweet = getTweet(iterator.next().message())
//logger.info("Consuming tweet: " + tweet.id)
destination ! tweet
}
You should now start seeing data in solr:
{
"responseHeader":{
"zkConnected":true,
"status":0,
"QTime":1,
"params":{
"q":"*:*",
"rows":"1",
"wt":"json"
}
},
"response":{
"numFound":42,
"start":0,
"docs":[
{
"id":"923302396612182016",
"username":"Tawanna Kessler",
"userId":898322458742337536,
"userScreenName":"tawanna_kessler",
"userDesc":"null",
"userProfileImgUrl":"http://pbs.twimg.com/profile_images/898323854417940484/lke3BSjt_normal.jpg",
"favCount":0,
"retweetCount":183,
"lang":"en",
"place":"null",
"message":"RT @craigbrownphd: Two upcoming webinars: Two new Microsoft webinars are taking place over the next week that may… https://t.co/SAb9CMmVXY…",
"isSensitive":false,
"isTruncated":false,
"isFavorited":false,
"isRetweeted":false,
"isRetweet":true,
"createdAt":"2017-10-26T03:07:00Z",
"_version_":1582267022370144256
}
]
}
}
Querying solr data with banana
Banana is a data visualization tool that uses solr for data analysis and display. It can be run in same container as solr. Here's how to set it up:
Here's how to set it up for our tweet data. We will run it in same container as solr:
Download banana and put it in solr's webapp directory
To save dashboards and setting, banana expects a collection named banana-int
. Lets go ahead and create it. Configset for that collection can be obtained found in banana/resources/banana-int-solr-5.0/
.
Upload banana config to zookeeper
$SOLR_HOME/server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir banana-int-solr-5.0/conf/ -confname banana
Create the collection
http://localhost:8983/solr/admin/collections?action=create&name=banana-int&collection.configName=banana&numShards=1
Navigate to banana UI at http://localhost:8983/solr/banana/src/index.html
and change the collection in settings to point to tweet
collection in
Here's what it will look like for our tweets data:
Next we will create a cassandra consumer.
Putting data in cassandra
- Download cassandra from http://archive.apache.org/dist/cassandra/3.0.12/apache-cassandra-3.0.12-bin.tar.gz and uncompress it
- Run
bin/cassandra
to start it
We need to first create a keyspace and table for storing tweets
CREATE KEYSPACE twitter WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
CREATE TABLE twitter.tweet (
topic text,
id text,
username text,
userId text,
userScreenName text,
userDesc text,
userProfileImgUrl text,
favCount bigint,
retweetCount bigint,
lang text,
place text,
message text,
isSensitive boolean,
isTruncated boolean,
isFavorited boolean,
isRetweeted boolean,
isRetweet boolean,
createdAt timestamp,
creationDate timestamp,
PRIMARY KEY ((topic, creationDate), username, id)
)
Then we will create a CassWriter
actor similar to solr one which will accept a tweet message and write it to cassandra.
Connect to cluster.
lazy val cluster = Cluster.builder().addContactPoint(seeds).build()
lazy val session = cluster.connect(keyspace)
Since we will be using same query repeatedly to insert data with different parameters, hence we will use prepared statement to improve performance:
lazy val prepStmt = session.prepare(s"INSERT INTO $cf (" +
"topic, id, username, userId, userScreenName, userDesc, userProfileImgUrl, favCount," +
"retweetCount, lang, place, message, isSensitive, isTruncated, isFavorited, isRetweeted," +
"isRetweet, createdAt, creationDate" +
") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
Take Tweet
, create a BoundStatement
by setting values for all fields and write it to cassandra
def writeToCass(t:Tweet) = {
try {
val boundStmt = prepStmt.bind()
.setString("topic", topic)
.setString("id",t.id)
.setString("username", t.username)
.setString("userId", t.userId.toString)
.setString("userScreenName",t.userScreenName)
.setString("userDesc",t.userDesc)
.setString("userProfileImgUrl",t.userProfileImgUrl)
.setLong("favCount",t.favCount)
.setLong("retweetCount",t.retweetCount)
.setString("lang",t.lang)
.setString("place",t.place)
.setString("message",t.message)
.setBool("isSensitive",t.isSensitive)
.setBool("isTruncated",t.isTruncated)
.setBool("isFavorited",t.isFavorited)
.setBool("isRetweeted",t.isRetweeted)
.setBool("isRetweet",t.isRetweet)
.setTimestamp("createdAt", new Date(t.createdAt))
.setTimestamp("creationDate", new Date(t.createdAt))
session.execute(boundStmt)
} catch {
case ex: Exception =>
logger.error("C* insert exception. Message: " + ex.getMessage)
}
}
We will create a new instance of this actor
val cassWriter = system.actorOf(Props(new CassWriter(cassSeeds, cassKeyspace, cassCf, topic)), name = "cass_writer")
And then create a new KafkaTweetConsumer
whose destination will be this cassWriter
actor
val cassConsumer = system.actorOf(Props(
new KafkaTweetConsumer(zkHostKafka, "tweet-cass-consumer", topic, cassWriter)), name = "cass_consumer")
You should start seeing data in cassandra
cqlsh> select creationdate, userscreenname, lang, message from twitter.tweet limit 1;
creationdate | userscreenname | lang | message
--------------------------+----------------+------+--------------------------------------------------------------------------------------------------------------------------------------------
2017-10-25 21:56:30+0000 | alevergara78 | en | RT @HomesAtMetacoda: data in motion >> Online learning: #MachineLearning’s secret for #bigdata via\n@SASsoftware https://t.co/eGbAumJzEt…
Next we will setup spark and use it to query cassandra data.
Query cassandra data with spark
We will use datastax spark cassandra connector https://github.com/datastax/spark-cassandra-connector. Download the correct connection version jar and place it in lib directory of your project:
First thing which we need is a spark context
val CASS_SEEDS = "127.0.0.1"
val SPARK_MASTER = "spark://sam-ub:7077"
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", CASS_SEEDS)
.setJars(Seq("lib/spark-cassandra-connector-assembly-2.0.0.jar"))
.setMaster(SPARK_MASTER)
.setAppName("cass_query")
lazy val sc = new SparkContext(conf)
Then you can query and apply different aggregations. This query will be picked up as a spark job and executed on you spark cluster:
val data = sc.cassandraTable("twitter", "tweets")
.select("topic", "creationdate", "retweetcount", "id", "isretweet")
.where("topic = 'tweets' and creationdate = '2017-10-25 20:15:05+0000'")
.groupBy(_.getLong("retweetcount"))
.map(r => (r._1, r._2.size))
.collect()
logger.info("Count of rows = " + data)
If job is successful, you will see the result:
Visualizing cassandra data with zeppelin
Zeppelin is a web-based notebook that can be used for interactive data analytics on cassandra data using spark.
Download the binary from https://zeppelin.apache.org/download.html and uncompress it. Default port used by it is 8080
which conflicts with spark master web ui port, so change the port in conf/zeppelin-site.xml
.
Create a new notebook and select spark interpreter
Create a view of our tweet
table from cassandra
%spark.sql
create temporary view mytweets
using org.apache.spark.sql.cassandra
options (keyspace "twitter", table "tweet")
We can now run aggregations or other analytics queries on this view:
%spark.sql
select lang, count(*) as occur from mytweets where lang != 'und' group by lang order by occur desc limit 10
Here's what output of above query will look like:
Conclusion
I hope you got the idea of how to get started with creating a search and analytics pipeline.