Distributed commit log or Distributed streaming platform(recently).

Basic outline and Big Picture

Image of Kafka General

Message

Schemas

JSON or XML are ease, but lack type handling and compatibility between schema versions.

To solve , Apache Avro is recommended.

Topics and Partitions

Producers

Consumers

Consumer group

Brokers

Retention policy

Unlike MQ, each broker can be configured with certain retention period. It can vary from hours to days based on the use-case.

Producer

Producer record - will contain

Partitioner

If partition is specified in the producer record, partitioner does nothing.

If not, partitioner will chose the partition for us, usually based on the key in the producer record.

RecordMetaData

Is the response from Broker If the message is written to kafka successfully.

It contains,

Error is returned if writing to kafka fails. ( retry does happen before sending out an error )

Producer Config

Bootstrap.servers , is a list of brokers for producer to connect to with host:port. Recommended setting of 2 brokers for fail-overs.

Key.serializer, class which is used to serialize the keys of the record/message we write to kafka broker.

value.serializer, class which used to serialize the value.

P.S - we need to serialize as kafka brokers expect byte arrays for keys and value.

Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);      
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

Complete list of producer configs are present here.

producer config list

Few configs for reliability, memory use and performance. Finding right balance.

Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// basically expects the leader replica received the
// message before considering the message received successfully by the broker.
// if this config value is all, then all the replicas will receive the message
// before the message is considered received successfully.
configProps.put(ProducerConfig.ACKS_CONFIG,1);
// for low volume usage.
// update this based on hardware config review.
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,40960);
//default has no compression enabled. has low cpu overhead than other option of gzip or lz4.
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
//no,of retries on transient errors returned by broker. "e.g leader partition not available"
configProps.put(ProducerConfig.RETRIES_CONFIG,3);
//delay between the retries, in ms.
// default 100 ms. this is for transient errors from broker.
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
// 100kb mem alloc for batching to same partition.
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG,100);
// per client basis producer for the client identification for logging and metrics.
configProps.put(ProducerConfig.CLIENT_ID_CONFIG,"CLIENTID");
// Max message size we can send to broker. recommended to match broker's config "message.max.bytes"
// max a broker can accept.
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,500);

Partitions

Custom Partitioner

Kafka has a default partitoner, but we can create a custom partitioner if we want to.

Will Need to implement Partitioner interface.

public class CustomPartitioner implements Partitioner

Consumers

One that reads the message off the broker from a specific topic and partition.

Consumer groups

Partition Rebalance.

Consumer Config

props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "MoneyCounter");
props.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
  "org.apache.kafka.common.serialization.JSONDeserializer")

Poll loop

ConsumerRecord records = consumer.poll(100);

Consumer Configuration for fine tuning.

Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JSONDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //minimum amount of bytes before fetching the message from the broker.
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1024 * 1024);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);

Commit and Offsets

Process of commit

Edge Case.

AutoCommit

enable.auto.commit = true. , every 5 sec (default) consumer will commit the largest offset it received from poll() operation. Config option , auto.commit.interval.ms can be set to anything apart from the default.

Cons of auto commit

If rebalance happens within the specified time interval of auto commit, we might end up duplicating the message processing.

Commit Manually

Set enable.auto.commit = false, then we have 2 options to commit

CommitSync() - will commit the latest offset returned by poll() and will fail if commit was not successful.

commitSync() will retry as much as possible before throwing error on failure.

cons of sync commit

Consumer is blocked until the commit action responds. So throughput will suffer.

Commit Asynchronously

consumer.commitAsync() - doesnt get blocked for the response from broker from the commit request.

cons of async commit

// code goes in here to show this pattern.

TO make sure the commit is successful.

Rebalancing Listeners

We can take certain actions in consumer when there is a rebalanced trigger happened due to partition added or consumer crashes.

ConsumerRebalanceListener

It has 2 methods we can implement.

Consuming records from specific offset.

To continue on Part 2 ....