Post

Exploring Apache Kafka Internals and Codebase

Trying to dig into the Apache Kafka codebase and make some sense of it.

Exploring Apache Kafka Internals and Codebase

I have been working with Apache Kafka for a couple of years, and during this same period, I also started learning about Open Source and contributing to some projects. Up until recently, though, I hadn’t taken the time to intersect these two areas and explore the Kafka codebase. It’s interesting how compartmentalized life can be. But intriguing and cool things can emerge when we mix and break down those walls.

One other thought: I consider myself extremely fortunate to be in a field and at a time where world-class systems and code are just a few clicks away, complete with excellent documentation and a commit history that tell the story of their evolution. It truly is a wonderful time to be alive.

Goal of this post

The goal of this post is not to explore every nook and cranny of the Kafka codebase. I might be foolish, but not so much as to believe this could be accomplished in a single blog post. The ambition here is to share parts of the code that caught my attention, things I wondered about, investigated, and hopefully understood. These are what I thought were worth sharing. Writing this post is also an excellent way to keep notes and push myself to think more clearly.

Primer on Apache Kafka

For the purpose of our exploration, let’s keep the scope limited to the essentials of Kafka.

Apache Kafka is one of the standard ways of moving large volumes of data in real-time with fault-tolerance and scalability. Messages are replayable and durable. There are three key players in the Kafka game.

A broker is basically a server that accepts Fetch and Produce requests, storing the data in a log, which is replicated across other brokers. Messages are organized into topics, and each topic is divided into partitions. The replication happens at the partition level. Kafka scales because work can be divided between partitions.

Next, we have producers, which are client application, often Java processes, though they can be any application that speaks the Kafka protocol (a TCP protocol). Producers send data to topics on the brokers. Depending on the problem and constraints (mainly latency and throughput), the producer can be configured for optimal performance.

Finally, consumers, similar to producers, are applications (Java or otherwise) that speak the Kafka protocol and want to read data from the server. Essentially, they retrieve data from the distributed, replicated log written by the producers.

Entrypoint: kafka-server-start.sh and kafka.Kafka

A natural starting point is kafka-server-start.sh (the script used to spin up a broker) which fundamentally invokes kafka-run-class.sh to run kafka.Kafka class.

kafka-run-class.sh, at its core, is nothing other than a wrapper around the java command supplemented with all those nice Kafka options.

1
 exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"

And the entrypoint to the magic powering modern data streaming? The following main method situated in Kafka.scala i.e. kafka.Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  try {
      val serverProps = getPropsFromArgs(args)
      val server = buildServer(serverProps)

      // ... omitted ....

      // attach shutdown handler to catch terminating signals as well as normal termination
      Exit.addShutdownHook("kafka-shutdown-hook", () => {
        try server.shutdown()
        catch {
          // ... omitted ....
        }
      })

      try server.startup()
      catch {
       // ... omitted ....
      }
      server.awaitShutdown()
    }
    // ... omitted ....

That’s it. Parse the properties, build the server, register a shutdown hook, and then start up the server.

The first time I looked at this, it felt like peeking behind the curtain. At the end of the day, the whole magic that is Kafka is just a normal JVM program. But a magnificent one. It’s incredible that this astonishing piece of engineering is open source, ready to be explored and experimented with.

And one more fun bit: buildServer is defined just above main. This where the timeline splits between Zookeeper and KRaft.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    val config = KafkaConfig.fromProps(props, doLog = false)
    if (config.requiresZookeeper) {
      new KafkaServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None,
        enableForwarding = enableApiForwarding(config)
      )
    } else {
      new KafkaRaftServer(
        config,
        Time.SYSTEM,
      )
    }

How is config.requiresZookeeper determined? it is simply a result of the presence of the process.roles property in the configuration, which is only present in the Kraft installation.

Zookepeer connection

Kafka has historically relied on Zookeeper for cluster metadata and coordination. This, of course, has changed with the famous KIP-500, which outlined the transition of metadata management into Kafka itself by using Raft (a well-known consensus algorithm designed to manage a replicated log across a distributed system, also used by Kubernetes). This new approach is called KRaft (who doesn’t love mac & cheese?).

If you are unfamiliar with Zookeeper, think of it as the place where the Kafka cluster (multiple brokers/servers) stores the shared state of the cluster (e.g., topics, leaders, ACLs, ISR, etc.). It is a remote, filesystem-like entity that stores data. One interesting functionality Zookeeper offers is Watcher callbacks. Whenever the value of the data changes, all subscribed Zookeeper clients (brokers, in this case) are notified of the change. For example, when a new topic is created, all brokers, which are subscribed to the /brokers/topics Znode (Zookeeper’s equivalent of a directory/file), are alerted to the change in topics and act accordingly.

Why the move? The KIP goes into detail, but the main points are:

  1. Zookeeper has its own way of doing things (security, monitoring, API, etc) on top of Kafka’s, this results in a operational overhead (I need to manage two distinct components) but also a cognitive one (I need to know about Zookeeper to work with Kafka).
  2. The Kafka Controller has to load the full state (topics, partitions, etc) from Zookeeper over the network. Beyond a certain threshold (~200k partitions), this became a scalability bottleneck for Kafka.
  3. A love of mac & cheese.

Anyway, all that fun aside, it is amazing how simple and elegant the Kafka codebase interacts and leverages Zookeeper. The journey starts in initZkClient function inside the server.startup() mentioned in the previous section.

1
2
3
4
5
  private def initZkClient(time: Time): Unit = {
    info(s"Connecting to zookeeper on ${config.zkConnect}")
    _zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)
    _zkClient.createTopLevelPaths()
  }

KafkaZkClient is essentially a wrapper around the Zookeeper java client that offers Kafka-specific operations. CreateTopLevelPaths ensures all the configuration exist so they can hold Kafka’s metadata. Notably:

1
2
3
    BrokerIdsZNode.path, // /brokers/ids
    TopicsZNode.path, // /brokers/topics
    IsrChangeNotificationZNode.path, // /isr_change_notification

One simple example of Zookeeper use is createTopicWithAssignment which is used by the topic creation command. It has the following line:

1
zkClient.setOrCreateEntityConfigs(ConfigType.TOPIC, topic, config)

which creates the topic Znode with its configuration.

Other data is also stored in Zookeeper and a lot of clever things are implemented. Ultimately, Kafka is just a Zookeeper client that uses its hierarchical filesystem to store metadata such as topics and broker information in Znodes and registers watchers to be notified of changes.

Networking: SocketServer, Acceptor, Processor, Handler

A fascinating aspect of the Kafka codebase is how it handles networking. At its core, Kafka is about processing a massive number of Fetch and Produce requests efficiently.

I like to think about it from its basic building blocks. Kafka builds on top of java.nio.Channels. Much like goroutines, multiple channels or requests can be handled in a non-blocking manner within a single thread. A sockechannel listens of on a TCP port, multiple channels/requests registered with a selector which polls continuously waiting for connections to be accepted or data to be read.

As explained in the Primer section, Kafka has its own TCP protocol that brokers and clients (consumers, produces) use to communicate with each other. A broker can have multiple listeners (PLAINTEXT, SSL, SASL_SSL), each with its own TCP port. This is managed by the SockerServer which is instantiated in the KafkaServer.startup method. Part of documentation for the SocketServer reads :

1
2
3
4
5
6
 *    - Handles requests from clients and other brokers in the cluster.
 *    - The threading model is
 *      1 Acceptor thread per listener, that handles new connections.
 *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
 *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *      M Handler threads that handle requests and produce responses back to the processor threads for writing.

This sums it up well. Each Acceptor thread listens on a socket and accepts new requests. Here is the part where the listening starts:

1
2
3
4
5
6
7
8
9
10
11
  val socketAddress = if (Utils.isBlank(host)) {
      new InetSocketAddress(port)
    } else {
      new InetSocketAddress(host, port)
    }
    val serverChannel = socketServer.socketFactory.openServerSocket(
      endPoint.listenerName.value(),
      socketAddress,
      listenBacklogSize, // `socket.listen.backlog.size` property which determines the number of pending connections
      recvBufferSize)   // `socket.receive.buffer.bytes` property which determines the size of SO_RCVBUF (size of the socket's receive buffer)
    info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")

Each Acceptor thread is paired with num.network.threads processor thread.

1
2
3
 override def configure(configs: util.Map[String, _]): Unit = {
    addProcessors(configs.get(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG).asInstanceOf[Int])
  }

The Acceptor thread’s run method is beautifully concise. It accepts new connections and closes throttled ones:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  override def run(): Unit = {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    try {
      while (shouldRun.get()) {
        try {
          acceptNewConnections()
          closeThrottledConnections()
        }
        catch {
          // omitted
        }
      }
    } finally {
      closeAll()
    }
  }

acceptNewConnections TCP accepts the connect then assigns it to one the acceptor’s Processor threads in a round-robin manner. Each Processor has a newConnections queue.

1
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)

it is an ArrayBlockingQueue which is a java.util.concurrent thread-safe, FIFO queue.

The Processor’s accept method can add a new request from the Acceptor thread if there is enough space in the queue. If all processors’ queues are full, we block until a spot clears up.

The Processor registers new connections with its Selector, which is a instance of org.apache.kafka.common.network.Selector, a custom Kafka nioSelector to handle non-blocking multi-connection networking (sending and receiving data across multiple requests without blocking). Each connection is uniquely identified using a ConnectionId

1
localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + processorId + "-" + connectionIndex

The Processor continuously polls the Selector which is waiting for the receive to complete (data sent by the client is ready to be read), then once it is, the Processor’s processCompletedReceives processes (validates and authenticates) the request. The Acceptor and Processors share a reference to RequestChannel. It is actually shared with other Acceptor and Processor threads from other listeners. This RequestChannel object is a central place through which all requests and responses transit. It is actually the way cross-thread settings such as queued.max.requests (max number of requests across all network threads) is enforced. Once the Processor has authenticated and validated it, it passes it to the requestChannel’s queue.

Enter a new component: the Handler. KafkaRequestHandler takes over from the Processor, handling requests based on their type (e.g., Fetch, Produce).

A pool of num.io.threads handlers is instantiated during KafkaServer.startup, with each handler having access to the request queue via the requestChannel in the SocketServer.

1
2
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix)

Once handled, responses are queued and sent back to the client by the processor.

That’s just a glimpse of the happy path for a simple request. There’s much more complexity behind the scenes, but I hope this brief explanation provides some insight into the process. Networking is definitely one of the most fascinating parts of the codebase.


Another intriguing aspect of the Kafka codebase is how it manages storage and actual data. This is handled by the LogManager which is responsible for log creation, retrieval, and cleanup. Kafka, much like a database, is primarily concerned with reading from and writing data to files.

Kafka messages are stored in topics. Each topic has one or multiple partitions. And each partition is a directory containing segments, which are the actual files where the data is stored. A replica of a partition is a directory on another broker holding the exact same segments (files).

Here’s what partition 0 of mytopic might look like:

1
2
3
4
5
6
7
8
9
tree /kafka/kafka-logs/mytopic-0
├── 00000000000000000001.index
├── 00000000000000000001.log
├── 00000000000000000001.timeindex
├── 00000000000000000002.index
├── 00000000000000000002.log
├── 00000000000000000002.timeindex


  • .log files that hold the actual data. Running bin/kafka-dump-log.sh --files /kafka/kafka-logs/mytopic-0/00000000000000000001.log prints the messages in the log file:
    1
    2
    3
    
      baseOffset: 15946643 lastOffset: 15946643 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 68 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1733636569297 size: 1789 magic: 2 compresscodec: lz4 crc: 1648303417 isvalid: true
      baseOffset: 15946644 lastOffset: 15946645 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 68 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 1789 CreateTime: 1733636569297 size: 1698 magic: 2 compresscodec: lz4 crc: 4030506336 isvalid: true
      baseOffset: 15946646 lastOffset: 15946647 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 68 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 3487 CreateTime: 1733636574447 size: 2876 magic: 2 compresscodec: lz4 crc: 3721890787 isvalid: true
    
  • .index files are a mapping between message offset and file position e.g. message whose offset is 22 start at byte 799. Running bin/kafka-dump-log.sh --files /kafka/kafka-logs/mytopic-0/00000000000000000001.index prints the offset mapping:
    1
    2
    3
    
      offset: 15946649 position: 6363
      offset: 15946652 position: 10563
      offset: 15946657 position: 16767
    
  • .timeindex files map timestamps to offset e.g. timestamp 1733867113 maps to . They allow Kafka to perform time-based queries. Running bin/kafka-dump-log.sh --files /kafka/kafka-logs/mytopic-0/00000000000000000001.timeindex prints the time mapping:
    1
    2
    3
    
      timestamp: 1733636574447 offset: 15946647
      timestamp: 1733636579430 offset: 15946654
      timestamp: 1733636580266 offset: 15946658
    

The LogManager is initialized during KafkaServer.startup. It serves as a mechanism to schedule regular log-related tasks that Kafka requires, such as deleting old logs, performing log compaction, and flushing data to disk. This functionality is clearly demonstrated by its startupWithConfigOverrides method, which is invoked by the LogManager’s startup method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    loadLogs(defaultConfig, topicConfigOverrides, isStray) // this could take a while if shutdown was not clean

    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention",
                         () => cleanupLogs(),
                         initialTaskDelayMs,
                         retentionCheckMs)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher",
                         () => flushDirtyLogs(),
                         initialTaskDelayMs,
                         flushCheckMs)

      // omitted ...

      scheduler.scheduleOnce("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         () => deleteLogs(),
                         initialTaskDelayMs)
    }
    // omitted ...

We simply schedule schedule cleanupLogs,flushDirtyLogs and deleteLogs at regular intervals, which, as their names suggest, clean, flush, and delete logs.

But before that, we load the different topic partition logs through loadLogs, which creates an instance of UnifiedLog (UnifiedLog is the class representing a partition log, including both local logs and tiered-storage remote logs) for each partition directory and populates a concurrent, thread-safe hashmap currentLogs of type [TopicPartition, UnifiedLog] that holds all current logs. UnifiedLog is used to interact with the actual files. For example, it appends messages to the active segment using its append method, which calls the append method of LocalLog.

A LocalLog, which is part of the UnifiedLog (let’s leave remote storage for another time), is essentially a map of <Long, LogSegment>, specifically a ConcurrentNavigableMap. The key is the base offset of the segment, with base_offset(n+1) > base_offset(n) for segments n and n+1. A Segment is:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 /**
     * Create a LogSegment with the provided parameters.
     *
     * @param log The file records containing log entries
     * @param lazyOffsetIndex The offset index
     * @param lazyTimeIndex The timestamp index
     *...
     * @param baseOffset A lower bound on the offsets in this segment
     * ...
     */
    public LogSegment(FileRecords log,
                      LazyIndex<OffsetIndex> lazyOffsetIndex,
                      LazyIndex<TimeIndex> lazyTimeIndex,
                      // ...
                      long baseOffset,
                      // ...
                      )

A log (data) file, offset and time indices and a base offset.

The index files are kept in memory through mmap (a Linux system call to map files into process memory → fast but memory-consuming; index files are small enough since they don’t hold the actual data). Segments are represented in the JVM as LogSegment.java, and appending a record to a segment involves adding it to a FileChannel in memory. It is not immediately flushed to disk, which is why Kafka is fast. Appending records is simply adding entries to RAM.

Data is flushed to disk from memory at a configurable frequency via log.flush.scheduler.interval.ms (default is 1 minute). This is scheduled during the LogManager’s startup and implemented in flushDirtyLogs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  private def flushDirtyLogs(): Unit = {
    debug("Checking for dirty logs to flush...")

    for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
      try {
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
              s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
        if (timeSinceLastFlush >= log.config.flushMs)
          log.flush(false)
      } catch {
        // omitted
      }
    }
  }

We loop over all partitions/logs (UnifiedLog) and call flush on the UnifiedLog, which calls flush on the LocalLog, which calls flush on the logSegment, which contains these three sweet lines:

1
2
3
log.flush();
offsetIndex().flush();
timeIndex().flush();

Log is backed by a ChannelFile (abstracted through the FileRecords class) and the two indices are backed by a MappedByteBuffer (mmap). The flush forces the persistence to local storage. It’s as simple as that.

Another interesting code path is how a Produce request ends up writing data to disk. In the previous Networking Section, we stopped at the handler, which matches the request API code to a PRODUCE request, Key=0. Fetch (Consume) is Key=1, for instance. Anyway, the handler dispatches the request with Key=0 (Produce) to handleProduceRequest, which then calls handleProduceAppend, and in turn calls appendToLocalLog. This leads to two other abstractions before reaching appendAsLeader in the previously visited UnifiedLog, which is a simple wrapper around append.

One final piece that I think is quite interesting is that the AbstractIndex used by .index offsets and .timeindex timestamp index makes use of Binary search from the MMAPed index when it receives a request for a specific index.

There are indeed many layers, but we can follow the path, and it becomes quite enjoyable when we see how all the Kafka abstractions are built on top of known Java lego blocks such as File Channels and MappedByteBuffer.

This post is licensed under CC BY 4.0 by the author.