Intro to Kafka - Part 2

Partition Allocation

  • When you create topic, kafka decides how the partitions are assigned between the brokers in cluster.
  • for 6 brokers, a topic with 10 partitions with replication factor of 3 , we now have 3 x 10 = 30 partition replicas to allocate to 6 brokers.
  • 30 partitions replicas to 6 brokers.
  • Kafka goals for assignment
    • to spread replicas evenly among brokers ,I.e 5 replicas per broker
    • To make sure that for each partition, each replica is in different broker.
    • If broker have rack info, assign replicas to diff racks if possible.

File Management

  • partitions are split into segments.
  • by default, each segment is 1 GB of data or 1 week of data whichever is smaller.
  • when Kafka writing to partition, segment(file) is closed when the limit is reached and new segment is opened.
  • OS must be tuned so as to handle the open file handles Kafka creates even for inactive segments.

File Format

  • Each segment is stored in a single data file.
  • Format is same as the message format sent by producer.
  • This helps in zero-copy. ( message sent to network channels without a local cache )
  • Message contains timestamp
    • either provided by producer when message sent.
    • or provided by broker when message arrived.

bin/kafka-run-class.sh kafka.tools.DumpLogSegments

Kafka ships with DumpLogSegments to show info about partition segment in filesystem.

--deep-iteration

This parameter will show compressed messages inside the wrapped message

Indexes

  • Indexes are ways kafka uses when consumer asks for messages from a given offset within a partition.
  • Offset is mapped to segment and position within the file. This is the index.
  • If index gets corrupted or deleted, it will regenerated by rereading the message from the matching log segment and recording the offset and location.

Compaction

  • Way of deleting older messages based on key.

  • Other way to say is keeping only the recent key of the message.

    • customer last used shipping address or last known state of an application.
  • retention policy : delete

    • deletes the messages older than the the retention period.
  • retention policy : compact

    • stores only the recent value for each key in the topic.
    • if topic has null keys compaction will fail.
How it works
  • log.cleaner.enabled = true

    • enables the compaction manager to start in a thread and few compaction threads.
  • Thread chooses partition with highest ratio of dirty ( uncompacted ) messages.

  • Thread reads dirty messages and creates in-memory map.

  • Map contains - 16 byte hash of message key & 8 byte offset of previous location of this message key. Total of 24 bytes.

  • For a 1 GB segment this takes 24 MB map to compact.

  • Administrator assigned memory for cleaner thread in total.

    • if 5 cleaner thread, memory will be equally shared.
    • if less memory provided, kafka will throw error.
  • After creating map, it will start checking contents.

    • if key exists, message is ignored as there is a newer message with the same key.
    • if key does not exist, then it is the latest and message moved to replacement segment.
  • At the end, we are left with one message per key.

When compaction happens.
  • Active segments are never compacted.
  • topics which have 50% of dirty messages, compaction starts.

Reliability

Reliability Guarantees

  • Kafka provides order of messages in a partition
  • produced message is committed when all in-sync replicas have received the message.
  • Messages that are committed will be available until one of replica is present.
  • consumers can read only committed messages.

When is a replica in-sync ?

  • If that replica is the leader.
  • If replica is a follower & it has sent a heartbeat to zookeeper in last 6 sec ( configurable )
  • If replica is a follower & fetched message from leader in last 10 secs
  • If replica is a follower & fetched most recent message from leader in last 10 secs. Meaning there should no lag.
  • Improper garbage collection config can make a broker to lose connection with zookeeper and make that broker go out-of-sync.

Broker Configuration

  • There is a default replication factor for all the topics created.

    • default.replication.factor
  • Unclean leader election

    • unclean - a broker is clean means the it has all the committed messages , if this is not the case then broker is unclean.
    • unclean.leader.election.enable = true is default. Provides availability.
    • When not in sync replica exists and allowed to become leader, we risk data loss
    • when we do not allow we risk availability as we have to wait for the original leader to come back up.
  • How to decide

    • If systems need data accuracy, we should set unclean.leader.election.enable = false.
    • If systems need availability more than accuracy we need to leave at the default
      • unclean.leader.election.enable = true