Kafka — Part Two

Sanket Saxena
15 min readJun 3, 2023

--

After a preliminary introduction to Apache Kafka, it’s time to explore its advanced features in more detail. This article uncovers aspects from client shutdown and partition rebalancing to message compression and producer safety.

Producers

Partition Assignment & Rebalancing Strategies

Here’s how producers handle partitioning:

  1. Round Robin: Without a key, the producer will follow a Round Robin partitioning strategy where it will cyclically write to all available partitions to balance the load.
  2. Keyed Messages: If a key is provided, the producer will hash the key and use the result to determine the partition. This ensures that all messages with the same key will end up on the same partition, which is important for message ordering.
  3. Custom Partitioner: You can also provide a custom Partitioner to control the logic of how records are assigned to partitions. This could be based on specific attributes of the record, or any other complex logic that fits your specific use case.

As for rebalancing, it doesn’t apply to producers in the same way it does for consumers. Producers don’t “own” partitions like consumers do. Producers write records to partitions and have no responsibility for reading from partitions, which is where rebalancing comes into play.

In the context of producers, we might consider the balancing of producing records across all available partitions. This is managed by the partitioning strategies mentioned above. If one producer starts to overwhelm a particular partition, it could be beneficial to look into custom partitioning logic, or reevaluating how keys are being assigned to records, to ensure that load is being evenly distributed across all partitions.

For producers, Kafka provides a few settings to manage partitioning:

  1. partitioner.class: This setting controls which class is used to determine the partition for a record when the partition is not explicitly provided. By default, it's set to org.apache.kafka.clients.producer.internals.DefaultPartitioner which follows the Round Robin and Keyed Messages strategy mentioned earlier. If you want to use a custom partitioner, you would provide the full class name here.
  2. linger.ms and batch.size: These settings control how long the producer will wait to send a batch of records and how many records it will collect before sending, respectively. By adjusting these, you can indirectly influence how records are spread out across partitions.

Safe Producer

To make a Kafka producer safe, i.e., to ensure that messages are not lost and are sent exactly once to the Kafka topic, you need to configure the following producer properties:

  1. Retries: This configuration defines the number of times the producer will retry sending a message if the first send fails. By default, the value is set to 0, meaning no retries will be attempted. However, for a safe producer, you should set this value to a high number like Integer.MAX_VALUE (2147483647). This practically means the producer will keep on trying to send the message virtually indefinitely until it succeeds.
  2. Idempotent: To ensure exactly once semantics, the producer needs to be idempotent. This means that even if a message is sent more than once, the result will be as if the message was sent only once. This can be achieved by setting the enable.idempotence property to true. When this property is enabled, the producer will ensure that any message that has been successfully written to Kafka won't be written again. This property is especially important when you have retries enabled, as it prevents duplicate messages from being written when a message is retried.
  3. Max In-Flight Requests: This is the maximum number of unacknowledged requests the producer will send on a single connection before blocking. If this number is greater than 1, then in the event of network failures, the retries could potentially result in out-of-order delivery of messages since Kafka could retry to send the earlier failed message after having successfully sent later messages. For a safe producer, it’s generally recommended to keep max.in.flight.requests.per.connection to 1 when enable.idempotence is set to true. This way, even though we may have network issues and retries, we won’t send out-of-sequence data.

Message Compression

The choice of where to apply message compression — at the producer or broker — has significant impacts on Kafka’s overall performance and resource utilization.

When compression is applied at the producer:

  1. Network Efficiency: Compressing the data at the producer side reduces the size of the data, which means less data is transmitted over the network to the Kafka broker. This can lead to significant bandwidth savings, especially when dealing with large volumes of data.
  2. Storage Efficiency: Smaller message sizes mean that the data takes up less space when stored on the Kafka broker, which can significantly improve storage efficiency.
  3. Batching Efficiency: Compression on the producer side also enables more efficient batching of messages. Since messages are compressed, more messages can fit into the batch of the same size, reducing the number of requests needed to send the same amount of messages.
  4. Consumer Decoding: Compressed messages sent by the producer are stored as-is by the broker and passed on to the consumers. The consumers then decompress these messages. This means that if your consumers are in a different network from your Kafka brokers, you can save on inter-network data transfer.

The trade-off here is the CPU usage on both producer and consumer sides. Compression at the producer level requires CPU cycles, as does decompression at the consumer level. However, this trade-off is often worth it given the benefits of network and storage efficiency.

Note: While Kafka allows for compression to be applied at the broker level, this is less common as it means data is sent uncompressed over the network from the producer to the broker, which loses many of the efficiency benefits mentioned above.

Buffering and Blocking: Handling Producer Overload

Kafka provides several mechanisms to handle situations where producers can’t process messages as fast as they arrive.

  • Buffer Memory: Kafka buffers messages at the producer level if the producer cannot send data to Kafka fast enough, up to a specified limit.
  • Max Block Time: Kafka provides a configuration called max.block.ms that determines how long a producer will block before sending a message fails. This ensures that producers don't wait indefinitely if the broker isn't responsive.

Messages batching

How Batching Works in Kafka: Kafka producers batch multiple records that are sent to the same partition into a single batch. This batch of records is sent together in a single request. Batching allows Kafka to handle higher volumes of messages and provides better throughput by reducing the network overhead associated with sending each message separately.

Controlling Batching: Batching in Kafka can be controlled by a few configuration parameters on the producer side:

  • batch.size: This sets the maximum size in bytes of a batch of messages. Once this limit is reached, the batch of messages will be sent to Kafka. This parameter does not necessarily mean that a batch will be sent when it's full - it could be sent before reaching this size.
  • linger.ms: This sets the maximum time (in milliseconds) to buffer the data on the producer side. With this setting, the producer waits for this duration before sending the messages in order to allow more messages to be batched together. If this duration passes and the batch isn't full yet, the messages will still be sent.
  • buffer.memory: This controls the total amount of memory available to the producer for buffering.

Advantages of Batching: Batching provides several advantages:

  • Improved Throughput: By reducing the network overhead associated with each message, Kafka is able to achieve higher throughput.
  • Efficient Network Utilization: By sending a batch of messages together, Kafka reduces the total number of network requests, thus making better use of network resources.
  • Reduced Disk Usage: On the broker side, Kafka writes the batches of messages directly to disk. This allows it to reduce the disk space usage.

For example, if you want higher throughput, you can increase the batch.size and linger.ms to allow more messages to be batched together. However, this could result in higher latency because the producer needs to wait longer to fill the batch. Conversely, if you want lower latency, you can decrease these parameters, but this could result in lower throughput because fewer messages are batched together.

Graceful Shutdown

Just like consumers, producers also have a close() method that should be called to close the producer cleanly and ensure that all pending writes have been sent to the servers and acknowledged.

Single Rolling Bounce

This technique is used when you want to roll out upgrades or configurations across the Kafka cluster with zero downtime. Each broker in the cluster is bounced one at a time.

Auto Offset Commit

This is a feature where the consumer’s offset will be periodically committed in the background. If auto-commit is enabled, whenever poll() is invoked, any offsets returned by that poll will be committed.

Linger.ms vs Batch.size

linger.ms is the time the producer waits for more records to batch together. batch.size is the maximum size of a batch. These configurations can be used together to control the trade-off between latency and throughput.

Consumers

Kafka Consumer Group Management and Offset Handling

Graceful Shutdown of ConsumersGraceful shutdown of Kafka consumers is important to ensure that all messages have been processed and offsets have been committed back to Kafka. This process can be controlled through the close() method of the KafkaConsumer API. The close() method will commit any offsets that need to be committed and send a LeaveGroup request to the coordinator to trigger a rebalance.

To configure the amount of time the close() method will wait for the consumer to commit any offsets and send the LeaveGroup request, Kafka provides the configuration parameter consumer.timeout.ms. This property specifies the timeout in milliseconds.

Consumer Joining and Leaving Consumer Groups

When a consumer starts up, it sends a JoinGroup request to the Group Coordinator. This consumer then becomes part of the Consumer Group. If the consumer is the first one to join the group, it becomes the group leader. The group leader is responsible for assigning partitions to each consumer in the group.

When a consumer gracefully shuts down, it sends a LeaveGroup request to the Group Coordinator. This triggers a rebalance of the consumer group, and partitions assigned to the leaving consumer are reassigned to other consumers in the group.

If a consumer fails and doesn’t send a LeaveGroup request, Kafka relies on the session.timeout.ms configuration parameter to detect this. If the consumer doesn't send any heartbeat within this interval, the Group Coordinator considers it dead and triggers a group rebalance.

Updating Offsets

The consumer reads messages from the partitions and keeps track of the latest offset it has consumed. This offset can be committed back to Kafka in two ways:

  1. Automatic Commit (default behavior): The consumer regularly commits the latest offset it has read back to Kafka. This is controlled by the auto.commit.interval.ms configuration property which defaults to 5000ms or 5 seconds.
  2. Manual Commit: The user application explicitly commits offsets. This provides more control and can ensure that offsets are only committed when the consumer has finished processing the messages.

The enable.auto.commit configuration property controls whether offsets are committed automatically. If this is set to false, the user application needs to commit offsets manually.

Single Consumer Reading from Multiple Partitions

A single consumer can read from multiple partitions. The number of consumers in a consumer group is usually less than or equal to the number of partitions. If there are more partitions than consumers, each consumer can be assigned multiple partitions.

By default, Kafka uses a Range assignor which tries to assign a contiguous range of partitions to each consumer. Another option is the RoundRobin assignor which assigns one partition at a time to each consumer in a round-robin fashion. This results in a more balanced assignment when the number of partitions is greater than the number of consumers.

The configuration property partition.assignment.strategy controls the assignment strategy. By default, it is set to org.apache.kafka.clients.consumer.RangeAssignor, but it can also be set to org.apache.kafka.clients.consumer.RoundRobinAssignor or to a custom class implementing the ConsumerPartitionAssignor interface.

Partition Assignment & Rebalancing Strategies

Kafka’s partition assignment strategies determine how topic partitions are assigned to consumer instances in a consumer group. The following are some of the key strategies provided by Kafka:

  1. Round Robin Assignor: This strategy sequentially assigns the available partitions to consumers in the group. This can result in a fairly even distribution of load, particularly when the processing time required for messages is similar across all partitions.
  2. Range Assignor: With the range strategy, Kafka assigns to each consumer a contiguous range of partitions. This strategy can be useful when your processing logic relies on the order of the data in each partition. However, it could lead to imbalances if the data or processing across partitions is uneven.
  3. Sticky Assignor: The Sticky Assignor tries to minimize the amount of movement of partitions amongst consumers when the group membership changes. This can be beneficial in reducing the amount of time it takes for consumers to restore their local data stores when there’s a change in the group membership.

Additionally, in Kafka 2.4, the concept of Cooperative Rebalancing was introduced. With the introduction of cooperative-sticky assignor, rebalancing became less disruptive. Unlike eager rebalancing, consumers in cooperative rebalancing do not need to stop consuming while a rebalance is in progress. Cooperative rebalancing reduces the downtime that was caused due to rebalancing in the eager model.

These partition assignment strategies can be set by changing the partition.assignment.strategy setting in the consumer's configuration properties. For example:

properties.setProperty("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

It’s important to understand the trade-offs of each strategy to make the right choice for your use case. Be aware that the assignment strategy must be the same across all consumers in a consumer group, or a ClassCastException will be thrown at runtime.

Connecting to Remote Kafka Server

To connect to a remote Kafka server using Java, you’d use the bootstrap.servers property to provide a list of host:port pairs for your Kafka brokers. The security.protocol and sasl.mechanism properties are used to specify the security protocol (such as SASL_SSL for secure communication), while the sasl.jaas.config property contains the necessary credentials for authentication.

Kafka Properties:

  • bootstrap.servers: This is the list of host:port pairs used to establish the initial connection to the Kafka cluster.
  • security.protocol: Protocol used to communicate with brokers. For using SASL with SSL, set this to SASL_SSL.
  • sasl.jaas.config: Configuration for SASL. Includes the login class for the selected mechanism (PLAIN/SCRAM/etc) and the server credentials.
  • sasl.mechanism: Mechanism used for client connections. This may be any mechanism for which a security provider is available.
  • key.deserializer and value.deserializer: The class of deserializer to materialize keys/values from bytes.
  • group.id: A unique string that identifies the consumer group this consumer belongs to.
  • auto.offset.reset: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server.

Consumer Delivery Semantics in Kafka

Understanding how messages are delivered to consumers in Kafka is crucial. There are three primary delivery semantics:

  • At most once: Here, offsets are committed as soon as the message batch is received. This way, even if processing fails, we won’t reprocess data, minimizing duplicates.
  • At least once: With this semantic, offsets are committed after the message is processed. This approach ensures no data loss, but the system needs to be idempotent as the same message can be read again.
  • Exactly once: For this, transactional APIs are needed. If a message is read, it can be written again. It’s the most desirable yet hardest to achieve.

Strategies for Consumer Offset Commitment

Kafka provides various ways to commit offsets based on application requirements:

  • Auto commit on: Kafka automatically commits the offsets periodically as defined by the ‘auto.commit.interval.ms’ configuration.
  • Auto commit off: In this strategy, first, the messages are processed, and then the offsets are committed. This mechanism ensures that offsets are committed only after successful processing.
  • External offset storage: Kafka also supports storing offsets externally. In this case, you can manually control when to commit offsets and use the seek API to navigate.

Consumer Offset Reset Behavior

Kafka provides strategies to handle scenarios when no offset data is available for a consumer group:

  • Latest: The consumer reads from the end of the log.
  • Earliest: The consumer reads from the start of the log.
  • None: Kafka throws an error if no offset data is found.

It’s crucial to remember that a consumer’s offset can be lost if it hasn’t read data for more than a time period specified by offset.retention.minutes configuration.

Replaying Data for Consumers

One of the many benefits of Kafka is its inherent ability to replay data. This is crucial when a system is recovering from a failure or when new consumers join a consumer group and need to process historical data. Here is how it works:

  • Consumer Group Removal: Before data can be replayed, all consumers must first be removed from the consumer group. This process ensures that no active consumer is reading from the partitions.
  • Offsets Manipulation: By manually manipulating the consumer offsets or using the ‘seek’ API, you can have your consumers start reading from a specific point in the topic log.
  • Avoiding Data Loss: Replaying data can also help avoid data loss. By starting from an earlier offset, you can reprocess messages in case of failures.
  • System Testing: Replay functionality is particularly helpful during system testing. Developers can continuously replay specific scenarios for performance tuning or debugging.

However, data replay is subject to Kafka’s data retention policies. If the data has been purged from Kafka topics due to retention settings, that data can’t be replayed.

Detecting Dead Consumers in Kafka

Kafka provides mechanisms to detect if a consumer is alive:

  • Consumer Heartbeats: The Kafka consumer client library includes a heartbeat thread that sends heartbeats to the Kafka broker at regular intervals specified by heartbeat.interval.ms configuration. The heartbeat serves as an indication to the broker that the consumer is still alive and processing messages. If the broker does not receive a heartbeat within the session.timeout.ms period, the broker assumes the consumer is dead and triggers a rebalance.
  • Consumer Poll Thread: The poll thread is responsible for consuming messages from Kafka topics. Alongside this, every time the poll() function is called, it also sends a heartbeat to the broker. So, if the consumer application is stuck in processing and not able to call poll() within the max.poll.interval.ms interval, then the broker assumes the consumer is dead and triggers a rebalance.

So, it’s a combination of these two mechanisms (heartbeats and polling) that help Kafka to detect whether a consumer is dead or alive.

Kafka Connect: The Bridge Between Systems

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems.

  • Source Connectors: Source connectors import data from another system into Kafka. For example, you might use a source connector to stream changes from a database into a Kafka topic.
  • Sink Connectors: Sink connectors export data from Kafka into another system. An example would be streaming data from a Kafka topic into a cloud storage system like AWS S3.

Kafka Connect provides a framework for integrating Kafka with external data sources/destinations, helping handle common challenges like fault tolerance, offset management, and delivery semantics.

Kafka Streams: Stream Processing Made Simple

Kafka Streams is a client library for processing and analyzing data stored in Kafka, following the stream processing paradigm.

  • Real-Time Processing: Kafka Streams provides real-time processing capabilities, which allows immediate insights from data.
  • Simplicity: Unlike big data processing frameworks, Kafka Streams is a simple library that can be embedded in any Java application. This simplifies deployment and maintenance.
  • Scalability: Kafka Streams applications can easily scale out to handle increased data volumes by running additional instances.

Kafka Schema Registry: Managing Data Compatibility

The Kafka Schema Registry is a service that provides a means of enforcing certain compatibility checks when reading and writing data.

  • Schema Storage: It stores the versioned history of all schemas used in Kafka topics, making it easier to evolve your data format without breaking compatibility.
  • Compatibility Checks: The registry enforces compatibility rules on schema evolution, avoiding producer/consumer incompatibilities.
  • Integration: It’s typically used in conjunction with Avro, a serialization system that lends itself well to schemas, but can be used with other formats as well.

Real-World Applications of Kafka

Apache Kafka, a distributed streaming platform, has diverse use cases across different industries due to its high throughput, low latency, and fault-tolerance capabilities. Let’s look at some real-world applications of Kafka.

1. Netflix

  • The user position service in Netflix acts as a Kafka producer, which pushes the last position of the user to a topic. Here, the user_id is utilized as the partition key.
  • The recommendation service consumes this data via Kafka Streams and provides recommendations for the next movie. The suggestions are posted to another topic called “recommendations”.
  • The resuming service also fetches data from the position topic to understand where the user paused their movie last time.
  • Kafka Connect is used to link these topics (position and recommendations) to Hadoop for future analytics.

2. Uber

  • In Uber, both drivers and riders post their positions to different Kafka partitions.
  • These positions are consumed by the surge pricing service using Kafka Streams, which calculates the surge price and posts it into the surge pricing topic.
  • Another service then consumes the surge pricing topic to calculate the cost of a ride.

3. Social Media — CQRS (Command Query Responsibility Segregation)

  • Different Kafka topics are utilized for different actions such as posts (produced by the post service), likes, and comments (produced by the like/comment service).
  • Kafka Streams are used to aggregate the total likes/comments for a post, and these are posted to another topic.
  • A trending service can consume this aggregated data to identify trending posts and post them to another topic.
  • There can be various consumers such as a refresh service that utilizes the post with aggregated count topic, while the trending service consumes the trending topic.
  • Events are published on Kafka to ensure that each service is updated with the latest data.

4. Banking

  • A Kafka Connect source connector (like Debezium) is used to ingest transaction data from an RDBMS into a Kafka topic named “transactions”. This topic would generally have a shorter retention policy.
  • User settings are published to another topic by a user settings service.
  • An alert service consumes both the transactions and settings topics to generate alerts, using Kafka Streams for processing.

5. Big Data Ingestion

  • Kafka acts as a buffer to manage and process the massive amount of data.
  • Kafka provides a “slow” layer for data storage and processing (the “fast” layer is when Kafka is used for real-time applications).
  • The data in Kafka is offloaded to long-term storage like Amazon S3 or Hadoop using Kafka Connect sink connectors.

6. Log and Metrics Collection

  • Kafka is also frequently used for log and metrics collection due to its high-throughput capabilities.
  • These use-cases typically require shorter retention periods and lower replication factors.

These are just a few examples of how Kafka can be utilized in real-world applications. The versatility and robustness of Kafka make it suitable for a variety of data streaming use-cases.

With a deeper understanding of these advanced Kafka concepts, you can further optimize and fine-tune your Kafka setup to match your specific requirements and workload.

--

--

Sanket Saxena
Sanket Saxena

Written by Sanket Saxena

I love writing about software engineering and all the cool things I learn along the way.

No responses yet