Writing your own Kafka source connector for Apache Solr
Kafka provides a common framework, called Kafka Connect, to standardize integration with other data systems. Kafka Connectors
are ready-to-use components built using Connect
framework. A connector can be a Source Connector
if it reads from an external system and write to Kafka or a Sink Connector
if it reads data from Kafka and write to external system.
In this post, we will see how to implement our own Kafka source connector. Our source connector will read data from an Apache Solr collection using CursorMark and write to a kafka topic.
Full code for this post with deployment instructions is available at https://github.com/saumitras/kafka-solr-connect
Steps for creating a Kafka source connector
While writing source connector for any data system, these are the high level steps:
- Figure out all the configurations for all(or most) type of workloads on your source
- Figure out mapping between your source data format and format when storing data in Kafka
- Partitioning strategy
- Collection name to Kafka topic name mapping, etc
- Implement a Kafka Connector class
- Implement a Source Task class
- Package the connector
- Deploy in standalone or distributed mode
Configuration
We need to first decide what all configurations we need to expose to user. For our solr connector, here's are different configs which user can change:
Config name | Config description |
---|---|
COLLECTION_NAME | Comma separated list of Solr collection or a collection alias which needs to written to Kafka |
ZK_HOST | Command separate list of zookeeper host of Solr cluster |
TOPIC_PREFIX | Connector will create a topic by appending this prefix to solr's COLLECTION_NAME |
QUERY | A solr query to filter out the docs. By default it will use : and fetch all documents |
BATCH_SIZE | Number of solr docs to fetch per polling request |
We will create a ConfigDef
using above configs which be used while defining SourceConnector
. For each configuration, you can specify the name, the type, the default value, the documentation, the group information, the order in the group, etc.
private val CONFIG_DEF: ConfigDef = {
new ConfigDef()
.define(ZK_HOST, Type.STRING, Importance.HIGH, ZOOKEEPER_HOST_COMMA_SEPARATED)
.define(ZK_CHROOT, Type.STRING, Importance.HIGH, CHROOT_IF_AVAILABLE)
.define(COLLECTION_NAME, Type.STRING, Importance.HIGH, NAME_OF_COLLECTION_TO_INDEX)
.define(TOPIC_PREFIX, Type.STRING, Importance.HIGH, PREFIX_TO_ADD_TO_COLLECTION_NAME_TO_GET_TOPIC_NAME)
.define(BATCH_SIZE, Type.STRING, Importance.LOW, NUMBER_OF_DOCS_TO_READ_IN_EACH_BATCH)
.define(QUERY, Type.STRING, Importance.HIGH, SOLR_QUERY_FILTER_WHILE_FETCHING_DOCS)
}
You can also override a config's Validator
to provide your own validation logic for a config's value. Next we will implement the Connector
and Task
interfaces. Since we are developing a source connector hence we will be extending the SourceConnector
and SourceTask
. For sink connector, there is a respective SinkConnector
and SinkTask
, but we will cover that in a separate post.
Implement SourceConnector
SourceConnector
doesn't have information about core functionality of the connector. It doesn't know how to read data from Solr, how to transform it to a kafka record, etc. SourceConnector
is only responsible for breaking the job specified via configuration into a set of SourceTask
that can be distributed to Kafka Connect workers
Lets start by implementing a SourceConnector
. Here are the methods we need to implement:
class SolrSourceConnector extends SourceConnector {
override def start(props: util.Map[String, String]): Unit = ???
override def taskClass(): Class[_ <: Task] = ???
override def taskConfigs(maxTasks: Int): List[Map[String, String]] = ???
override def stop(): Unit = ???
override def config(): ConfigDef = ???
override def version(): String = ???
}
Connector start()
This is the entry point for a connector.
- It accepts user-specified connector configuration
- Optionally talk to external system to get information about tasks
- Determine the task configuration
- Optionally start thread(s) to monitor external system
- if needed ask for task reconfiguration
override def start(props: util.Map[String, String]): Unit = {
val parsedConfig: AbstractConfig = new AbstractConfig(CONFIG_DEF, props)
topicPrefix = parsedConfig.getString(TOPIC_PREFIX)
zkHost = parsedConfig.getString(ZK_HOST)
zkChroot = parsedConfig.getString(ZK_CHROOT)
collectionName = parsedConfig.getString(COLLECTION_NAME)
batchSize = parsedConfig.getString(BATCH_SIZE)
query = parsedConfig.getString(QUERY)
}
Connector taskConfigs()
This method tell Connect
the configurations for each of the tasks:
override def taskConfigs(i: Int): util.List[util.Map[String, String]] = {
val configs = new java.util.ArrayList[java.util.Map[String, String]]()
val config = new java.util.HashMap[String, String]()
config.put(TOPIC_PREFIX, topicPrefix)
config.put(ZK_HOST, zkHost)
config.put(ZK_CHROOT, zkChroot)
config.put(COLLECTION_NAME, collectionName)
config.put(BATCH_SIZE, batchSize)
config.put(QUERY, query)
configs.add(config)
configs
}
Connector stop()
This is like a shutdown hook and you can perform housekeeping tasks here like closing all SolrClient
connections:
override def stop(): Unit = {
log.info("Stopping connector. Closing all client connections")
SolrClient.close()
}
Implement SourceTask
SourceTask
will contain core implementation and functionality of our connector. It pulls records from Solr and transform them in a format suitable for storage in Kafka. As with the SourceConnector
, our SourceTask
also gets the user supplied configuration and it initialises resources and internal configuration as needed.
Here's the methods we need to implement:
class SolrSourceTask extends SourceTask {
override def start(props: Map[String, String]): Unit = ???
override def stop(): Unit = ???
override def version(): String = ???
override def poll(): List[SourceRecord] = ???
}
Task start()
Here's responsibilities of start() method
- This task’s configuration that was created by your connector
- Read previously committed offsets to know where in the external system to start reading
- Create any resources it might need
- Connections to external system
- Buffers, queues, threads, etc.
In our example, while reading docs from Solr and writing to Kafka, we also store the offset(or cusorMark) of last fetched doc, so that after a task restart we know up to which point we have previuosly read the docs from Solr collection. These offsets are automatically committed to Kafka by the framework, but restoring the Task
to these offset is the responsibility of the connector. In start()
method, we restore that offset information. We also instantiate our own SchemaManager
which will be used to map solr docs to Kafka records. Note that this SchemaManager
is a custom class and not provided by Kafka-connect.
override def start(props: util.Map[String, String]): Unit = {
topicPrefix = props.get(TOPIC_PREFIX)
zkHost = props.get(ZK_HOST)
zkChroot = props.get(ZK_CHROOT)
collectionName = props.get(COLLECTION_NAME)
batchSize = props.get(BATCH_SIZE).toInt
query = props.get(QUERY)
cursorMark = getCurrentCursorMark(collectionName)
client = SolrClient(zkHost, zkChroot, collectionName)
schemaManager = SchemaManager(zkHost, zkChroot, collectionName)
}
Task poll()
This function is where all your actual “work” happens. Poll method fetches data from the Solr, transform each doc to kafka record format via SchemaManager
, and return a list of SourceRecord
which then gets written to Kafka.
- Poll method is called frequently to fetch data from source systems
- Get the next batch of records that Connect should write to Kafka
- block until there are “enough” records to return
- return null if no records right now
- For systems that push data
- use separate thread to receive and process the data and enqueue
- poll then dequeues and returns records
Each solr document will be mapped to a SourceRecord
. A SourceRecord
will contain following information:
- Topic Name
- Partition # (Optional)
- Record Key (with key schema)
- Record Value (with value schema)
- Source Partition and Source Offset
- Describes where this record originated
- Defined by connector, used only by connector
- Connect captures last partition+offset that it writes
- periodically committed to connect-offsets topic in distributed mode
- in standalone mode, stored on local filesystem
- When task starts up,
start()
method reads these to know where it should resume reading from
override def poll(): util.List[SourceRecord] = {
try {
val records = new util.ArrayList[SourceRecord]
val (nextCursorMark, solrDocs) = client.querySolr(query, batchSize, cursorMark)
if (cursorMark == nextCursorMark) {
log.info("No update in cursor-mark. Sleeping for " + pollDuration)
Thread.sleep(pollDuration)
} else {
solrDocs.foreach { doc =>
val msg = schemaManager.convertSolrDocToKafkaMsg(doc)
val sourcePartition = getPartition(collectionName)
val sourceOffset = getOffset(nextCursorMark)
val topic = topicPrefix + collectionName
val schema = schemaManager.SOLR_SCHEMA
val record = new SourceRecord(
sourcePartition,
sourceOffset,
topic,
schema,
msg
)
log.info("Adding new record. " + msg)
records.add(record)
}
cursorMark = nextCursorMark
}
records
} catch {
case e: IOException =>
e.printStackTrace()
throw e
}
}
Building the connector jar
We need to create a fat jar which will be deployed on the connect cluster. Run sbt assembly
to build the fat jar under kafka-solr-connect/target/scala-2.12/
. Move the jar to kafka-solr-connect/dist/
directory.
Configuring and running the connector
We now need to create two config file connect-solr-source.properties
which contains task related config and connect-standalone.properties
which contains config like path of connector jar, etc
We are only providing the mandatory configs we created as ConfigDef
and using default values for other configs
# Create connect-standalone.properties
plugin.path=kafka-solr-connect/dist # Provide absolute path to the connector jar.
offset.storage.file.filename=/tmp/connect.offsets # This location must be change=d to a non tmp dir
Start connector in standalone mode
Run following command to start the connector:
$ kafka_2.12-2.3.0/bin/connect-standalone.sh kafka-solr-connect/dist/resources/connect-standalone.properties
Please be informed that the order in which the properties files connect-standalone.properties
and connect-solr-source.properties
are supplied to the connect-standalone.sh
is important. For starting the connector in distributed mode, refer https://docs.confluent.io/current/connect/userguide.html#distributed-mode
Verify Data Flow
Once your connector is running, verify the data flow as below:
- Create a JSON document in Solr {"id":"doc01"}.
- Verify Kafka topic creation.
- Verify message in the Kafka Topic.
$ kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic solr_mycollection --from-beginning
Struct{_version_=1644922692298604544,id=doc01}
Conclusion
This conclude the fundamentals of writing a source connector. There are lot of other features which are provided by framework. You can check out following resources to learn more:
- Udemy video course - https://www.udemy.com/course/kafka-connect/
- Confluent's Developer guide - https://docs.confluent.io/current/connect/devguide.html
- Single Message Transformations - https://docs.confluent.io/current/connect/transforms/index.html
- Kafka Connect REST Interface - https://docs.confluent.io/current/connect/references/restapi.html#connect-userguide-rest
- JDBC Source Connector's Code - https://github.com/confluentinc/kafka-connect-jdbc/tree/master/src/main/java/io/confluent/connect/jdbc/source