Saumitra's blog

Search / Analytics / Distributed Systems / Machine Learning / DSLs

Writing Your Own Kafka Source Connector for Apache Solr

Kafka provides a common framework, called Kafka Connect, to standardize integration with other data systems. Kafka Connectors are ready-to-use components built using Connect framework. A connector can be a Source Connector if it reads from an external system and write to Kafka or a Sink Connector if it reads data from Kafka and write to external system.

In this post, we will see how to implement our own Kafka source connector. Our source connector will read data from an Apache Solr collection using CursorMark and write to a kafka topic. Full code for this post with deployment instructions is avaliable at https://github.com/saumitras/kafka-solr-connect

Steps for creating a Kafka source connector

While writing source connector for any data system, these are the high level steps:

  1. Figure out all the configurations for all(or most) type of workloads on your source
  2. Figure out mapping between your source data format and format when storing data in Kafka
    1. Partitioning strategy
    2. Collection name to Kafka topic name mapping, etc
  3. Implement a Kafka Connector class
  4. Implement a Source Task class
  5. Package the connector
  6. Deploy in standalone or distributed mode

Configuration

We need to first decide what all configurations we need to expose to user. For our solr connector, here’s are different configs which user can change:

Config name Config description
COLLECTION_NAME Comma separated list of Solr collection or a collection alias which needs to written to Kafka
ZK_HOST Command separate list of zookeeper host of Solr cluster
TOPIC_PREFIX Connector will create a topic by appending this prefix to solr’s COLLECTION_NAME
QUERY A solr query to filter out the docs. By default it will use : and fetch all documents
BATCH_SIZE Number of solr docs to fetch per polling request

We will create a ConfigDef using above configs which be used while defining SourceConnector. For each configuration, you can specify the name, the type, the default value, the documentation, the group information, the order in the group, etc.

1
2
3
4
5
6
7
8
9
private val CONFIG_DEF: ConfigDef = {
    new ConfigDef()
      .define(ZK_HOST, Type.STRING, Importance.HIGH, ZOOKEEPER_HOST_COMMA_SEPARATED)
      .define(ZK_CHROOT, Type.STRING, Importance.HIGH, CHROOT_IF_AVAILABLE)
      .define(COLLECTION_NAME, Type.STRING, Importance.HIGH, NAME_OF_COLLECTION_TO_INDEX)
      .define(TOPIC_PREFIX, Type.STRING, Importance.HIGH, PREFIX_TO_ADD_TO_COLLECTION_NAME_TO_GET_TOPIC_NAME)
      .define(BATCH_SIZE, Type.STRING, Importance.LOW, NUMBER_OF_DOCS_TO_READ_IN_EACH_BATCH)
      .define(QUERY, Type.STRING, Importance.HIGH, SOLR_QUERY_FILTER_WHILE_FETCHING_DOCS)
  }

You can also override a config’s Validator to provide your own validation logic for a config’s value. Next we will implement the Connector and Task interfaces. Since we are developing a source connector hence we will be extending the SourceConnector and SourceTask. For sink connector, there is a respective SinkConnector and SinkTask, but we will cover that in a separate post.

Implement SourceConnector

SourceConnector doesn’t have information about core functionality of the connector. It doesn’t know how to read data from Solr, how to transform it to a kafka record, etc. SourceConnector is only responsible for breaking the job specified via configuration into a set of SourceTask that can be distributed to Kafka Connect workers

Lets start by implementing a SourceConnector. Here are the methods we need to implement:

1
2
3
4
5
6
7
8
9
class SolrSourceConnector extends SourceConnector {

  override def start(props: util.Map[String, String]): Unit = ???
  override def taskClass(): Class[_ <: Task] = ???
  override def taskConfigs(maxTasks: Int): List[Map[String, String]] = ???
  override def stop(): Unit = ???
  override def config(): ConfigDef = ???
  override def version(): String = ???
}

Connector start()

This is the entry point for a connector.

  • It accepts user-specified connector configuration
  • Optionally talk to external system to get information about tasks
  • Determine the task configuration
  • Optionally start thread(s) to monitor external system
    • if needed ask for task reconfiguration
1
2
3
4
5
6
7
8
9
override def start(props: util.Map[String, String]): Unit = {
    val parsedConfig: AbstractConfig = new AbstractConfig(CONFIG_DEF, props)
    topicPrefix = parsedConfig.getString(TOPIC_PREFIX)
    zkHost = parsedConfig.getString(ZK_HOST)
    zkChroot = parsedConfig.getString(ZK_CHROOT)
    collectionName = parsedConfig.getString(COLLECTION_NAME)
    batchSize = parsedConfig.getString(BATCH_SIZE)
    query = parsedConfig.getString(QUERY)
}

Connector taskConfigs()

This method tell Connect the configurations for each of the tasks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override def taskConfigs(i: Int): util.List[util.Map[String, String]] = {
    val configs = new java.util.ArrayList[java.util.Map[String, String]]()

    val config = new java.util.HashMap[String, String]()
    config.put(TOPIC_PREFIX, topicPrefix)
    config.put(ZK_HOST, zkHost)
    config.put(ZK_CHROOT, zkChroot)
    config.put(COLLECTION_NAME, collectionName)
    config.put(BATCH_SIZE, batchSize)
    config.put(QUERY, query)
    configs.add(config)

    configs
}

Connector stop()

This is like a shutdown hook and you can perform housekeeping tasks here like closing all SolrClient connections:

1
2
3
4
  override def stop(): Unit = {
    log.info("Stopping connector. Closing all client connections")
    SolrClient.close()
  }

Implement SourceTask

SourceTask will contain core implementation and functionality of our connector. It pulls records from Solr and transform them in a format suitable for storage in Kafka. As with the SourceConnector, our SourceTask also gets the user supplied configuration and it initialises resources and internal configuration as needed.

Here’s the methods we need to implement:

1
2
3
4
5
6
class SolrSourceTask extends SourceTask {
  override def start(props: Map[String, String]): Unit = ???
  override def stop(): Unit = ???
  override def version(): String = ???
  override def poll(): List[SourceRecord] = ???
}

Task start()

Here’s responsibilities of start() method

  • This task’s configuration that was created by your connector
  • Read previously committed offsets to know where in the external system to start reading
  • Create any resources it might need
    • Connections to external system
    • Buffers, queues, threads, etc.

In our example, while reading docs from Solr and writing to Kafka, we also store the offset(or cusorMark) of last fetched doc, so that after a task restart we know up to which point we have previuosly read the docs from Solr collection. These offsets are automatically committed to Kafka by the framework, but restoring the Task to these offset is the responsibility of the connector. In start() method, we restore that offset information. We also instantiate our own SchemaManager which will be used to map solr docs to Kafka records. Note that this SchemaManager is a custom class and not provided by Kafka-connect.

1
2
3
4
5
6
7
8
9
10
11
12
13
override def start(props: util.Map[String, String]): Unit = {
    topicPrefix = props.get(TOPIC_PREFIX)
    zkHost = props.get(ZK_HOST)
    zkChroot = props.get(ZK_CHROOT)
    collectionName = props.get(COLLECTION_NAME)
    batchSize = props.get(BATCH_SIZE).toInt
    query = props.get(QUERY)

    cursorMark = getCurrentCursorMark(collectionName)
    client = SolrClient(zkHost, zkChroot, collectionName)

    schemaManager = SchemaManager(zkHost, zkChroot, collectionName)
}

Task poll()

This function is where all your actual “work” happens. Poll method fetches data from the Solr, transform each doc to kafka record format via SchemaManager, and return a list of SourceRecord which then gets written to Kafka.

  • Poll method is called frequently to fetch data from source systems
  • Get the next batch of records that Connect should write to Kafka
    • block until there are “enough” records to return
    • return null if no records right now
  • For systems that push data
    • use separate thread to receive and process the data and enqueue
    • poll then dequeues and returns records

Each solr document will be mapped to a SourceRecord. A SourceRecord will contain following information:

  • Topic Name
  • Partition # (Optional)
  • Record Key (with key schema)
  • Record Value (with value schema)
  • Source Partition and Source Offset
    • Describes where this record originated
    • Defined by connector, used only by connector
    • Connect captures last partition+offset that it writes
    • periodically committed to connect-offsets topic in distributed mode
    • in standalone mode, stored on local filesystem
    • When task starts up, start() method reads these to know where it should resume reading from
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
override def poll(): util.List[SourceRecord] = {
    try {
      val records = new util.ArrayList[SourceRecord]

      val (nextCursorMark, solrDocs) = client.querySolr(query, batchSize, cursorMark)

      if (cursorMark == nextCursorMark) {
        log.info("No update in cursor-mark. Sleeping for " + pollDuration)
        Thread.sleep(pollDuration)
      } else {
        solrDocs.foreach { doc =>
          val msg = schemaManager.convertSolrDocToKafkaMsg(doc)
          val sourcePartition = getPartition(collectionName)
          val sourceOffset = getOffset(nextCursorMark)
          val topic = topicPrefix + collectionName
          val schema = schemaManager.SOLR_SCHEMA

          val record = new SourceRecord(
            sourcePartition,
            sourceOffset,
            topic,
            schema,
            msg
          )
          log.info("Adding new record. " + msg)
          records.add(record)
        }
        cursorMark = nextCursorMark
      }

      records
    } catch {
      case e: IOException =>
        e.printStackTrace()
        throw e
    }
}

Building the connector jar

We need to create a fat jar which will be deployed on the connect cluster. Run sbt assembly to build the fat jar under kafka-solr-connect/target/scala-2.12/. Move the jar to kafka-solr-connect/dist/ directory.

Configuring and running the connector

We now need to create two config file connect-solr-source.properties which contains task related config and connect-standalone.properties which contains config like path of connector jar, etc

We are only providing the mandatory configs we created as ConfigDef and using default values for other configs

1
2
3
4
# Create connect-standalone.properties

zkChroot=/solr
collectionName=mycollection
1
2
3
4
# Create connect-standalone.properties

plugin.path=kafka-solr-connect/dist # Provide absolute path to the connector jar.
offset.storage.file.filename=/tmp/connect.offsets # This location must be change=d to a non tmp dir

Start connector in standalone mode

Run following command to start the connector:

1
$ kafka_2.12-2.3.0/bin/connect-standalone.sh kafka-solr-connect/dist/resources/connect-standalone.properties

Please be informed that the order in which the properties files connect-standalone.properties and connect-solr-source.properties are supplied to the connect-standalone.sh is important. For starting the connector in distributed mode, refer https://docs.confluent.io/current/connect/userguide.html#distributed-mode

Verify Data Flow

Once your connector is running, verify the data flow as below:

  • Create a JSON document in Solr {“id”:“doc01”}.
  • Verify Kafka topic creation.

    $ kafka_2.12-2.3.0/bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka 
    solr_mycollection
    
  • Verify message in the Kafka Topic.

    $ kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic solr_mycollection --from-beginning
    Struct{_version_=1644922692298604544,id=doc01}
    

Conclusion

This conclude the fundamentals of writing a source connector. There are lot of other features which are provided by framework. You can check out following resources to learn more: