- When you create topic, kafka decides how the partitions are assigned between the brokers in cluster.
- for 6 brokers, a topic with 10 partitions with replication factor of 3 , we now have 3 x 10 = 30 partition replicas to allocate to 6 brokers.
- 30 partitions replicas to 6 brokers.
- Kafka goals for assignment
- to spread replicas evenly among brokers ,I.e 5 replicas per broker
- To make sure that for each partition, each replica is in different broker.
- If broker have rack info, assign replicas to diff racks if possible.
- partitions are split into segments.
- by default, each segment is 1 GB of data or 1 week of data whichever is smaller.
- when Kafka writing to partition, segment(file) is closed when the limit is reached and new segment is opened.
- OS must be tuned so as to handle the open file handles Kafka creates even for inactive segments.
- Each segment is stored in a single data file.
- Format is same as the message format sent by producer.
- This helps in zero-copy. ( message sent to network channels without a local cache )
- Message contains timestamp
- either provided by producer when message sent.
- or provided by broker when message arrived.
Kafka ships with DumpLogSegments to show info about partition segment in filesystem.
This parameter will show compressed messages inside the wrapped message
- Indexes are ways kafka uses when consumer asks for messages from a given offset within a partition.
- Offset is mapped to segment and position within the file. This is the index.
- If index gets corrupted or deleted, it will regenerated by rereading the message from the matching log segment and recording the offset and location.
Way of deleting older messages based on key.
Other way to say is keeping only the recent key of the message.
- customer last used shipping address or last known state of an application.
retention policy : delete
- deletes the messages older than the the retention period.
retention policy : compact
- stores only the recent value for each key in the topic.
- if topic has null keys compaction will fail.
How it works
log.cleaner.enabled = true
- enables the compaction manager to start in a thread and few compaction threads.
Thread chooses partition with highest ratio of dirty ( uncompacted ) messages.
Thread reads dirty messages and creates in-memory map.
Map contains - 16 byte hash of message key & 8 byte offset of previous location of this message key. Total of 24 bytes.
For a 1 GB segment this takes 24 MB map to compact.
Administrator assigned memory for cleaner thread in total.
- if 5 cleaner thread, memory will be equally shared.
- if less memory provided, kafka will throw error.
After creating map, it will start checking contents.
- if key exists, message is ignored as there is a newer message with the same key.
- if key does not exist, then it is the latest and message moved to replacement segment.
At the end, we are left with one message per key.
When compaction happens.
- Active segments are never compacted.
- topics which have 50% of dirty messages, compaction starts.
- Kafka provides order of messages in a partition
- produced message is committed when all in-sync replicas have received the message.
- Messages that are committed will be available until one of replica is present.
- consumers can read only committed messages.
When is a replica in-sync ?
- If that replica is the leader.
- If replica is a follower & it has sent a heartbeat to zookeeper in last 6 sec ( configurable )
- If replica is a follower & fetched message from leader in last 10 secs
- If replica is a follower & fetched most recent message from leader in last 10 secs. Meaning there should no lag.
- Improper garbage collection config can make a broker to lose connection with zookeeper and make that broker go out-of-sync.
There is a default replication factor for all the topics created.
Unclean leader election
- unclean - a broker is clean means the it has all the committed messages , if this is not the case then broker is unclean.
- unclean.leader.election.enable = true is default. Provides availability.
- When not in sync replica exists and allowed to become leader, we risk data loss
- when we do not allow we risk availability as we have to wait for the original leader to come back up.
How to decide
- If systems need data accuracy, we should set unclean.leader.election.enable = false.
- If systems need availability more than accuracy we need to leave at the default
- unclean.leader.election.enable = true