KAFKA Notes (Code Help https://github.com/conduktor/kafka-beginners-course)
kafka topic:
- It's a particular streams of data , eg : logs, tweets, purchases, gps_data
- List like a table in a database (without any constraints).
- You can have as many topic as you want.
- A topic is identified by its name.
- Support any kind of format eg json, xml....
- Sequence of message is called a DataStream.
- You can't query topic, instead use the kafka producer to send the data and kafka consumer to read the data
Partition and Offsets:
- Topics are split in partition (ex: 100 partition)
- Each message will be ordered in partition.
- Each message within a partition get an increment id , called offset.
- kafka topics are immutable, once the data is written into the partition, it cant be changed.
- Data in kafka is kept for only for a limited time(default is 7 days - configurable).
- Offset only have a meaning for a specific partition. eg: offset 3 in a partition 0 doesn't represent the same data as offset 3 of portion 1.
- Offset are not re-used even if previous mgs is deleted .
- Order is granted only within a partition (not across the partition).
- Data is assigned randomly to a partition unless a key is provided.
- You can have as many partition as you want.
- Note**: each partition has one leader and multiple replicas
Producers:
- Producers write data to a topics (which is made fo partitions).
- Producers know to which partition to write to (and which kafka broker has it).
- In case of kafka broker failure, producers will automatically recover.
Producers: Message keys
- Producers can choose to send a key with the message (String, number binary etc.)
- if key = null, data is sent round-robin (partition 0, 1, 2...)
- if key != null, the all the message for that key will go in the same partition ( hashing )
- A key are typically sent if you need message ordering for a specific field (e.g truck id)
KAFKA messages anatomy
Kafka Message Serializer
- kafka only accept bytes as an input from the producer and send bytes out as output to consumer.
- Message Serialization means transforming object/data into bytes.
- They are used on the value on the key.
- Common Serializers
- String (json)
- int, float
- avro
- protobuff
Kafka Message hashing
- A kafka partitioner is a code logic that takes a record and determine to which partition to send it into.
- Key hashing: is the process of determining the mapping of a key to a partition.
- In the default kafka partitioner, the key are hashed using the murmur2 algo with the formula. formula: targetPartition = Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)
Consumer
- Consumers read data from a topic (identified by name) - pull model.
- Consumers automatically know which broker to read from.
- In case of broker failure, consumer know how to cover.
- Data is read in order from low to high offset within each partition.
- Ordering is not guarantee outside partition**
Consumer Deserializer
- Deserialize indicates how to transform byes into object / data
- They are used on the value and the key of the message
- Common Deserializer
- String (incl. JSON)
- Int, Float
- Avro
- ProtoBuf
- The Serialization / deserialization type must not change a top lifecycle (create a new topic instead)
Consumers Group
- All the consumers in an application read data as a consumer groups.
- Each consumer's within a group reads from exclusive partition.
Consumers Group : Too many Partition
- if you have more consumers than partitions, some consumers will be inactive.
- Multiple consumer on one topic
- it is acceptable to have multiple consumer groups on the same topic.
- To create a distinct group, use the consumer property group.id.
Consumer offset
- Kafka stores the offsets at which a consumer group has been reading.
- The offsets committed are in kafka topic named __consumer_offsets
- When a consumer in a group has processed data received from a kafka , it should be periodically committing the offset (the kafka broker will write to __consumer_offsets , not the group itself)
- Note**: Offsets are on;y relevant at level : topic-partition
Deliver semantic for consumers
- By default, Java Consumers will automatically commit offsets (at least once).
- There are 3 delivery semantics if you choose to commit manually.
- At least once (usually preferred)
- Offsets are committed after the message is processed.
- If the processing goes wrong, the message will be read again.
- This can result in duplicate processing of message , Make sure your preprocessing is idempotent (i.e processing again the message won't impact your systems).
- At most ones.
- Offsets are committed as soon as messages are received.
- if the processing goes wrong, some message will be lost (they won't be read again).
- Exactly ones.
- For kafka => kafka workflows: use the Transactional API (easy with Kafka Streams API)
- for kafka => External System workflows: use an idempotent consumers.
Note: you only need to connect to one broker (any broker) ans just provide the topic name you want to read from. kafka will route your calls to the appropriate brokers and partitions for you!
Kafka Brokers
- A kafka cluster is composed of multiple brokers (servers).
- Each broker is identified with its id (Integers)
- Each brokers contain certain topic partitions.
- After connection to any broker(called a bootstrap broker), you will be connected to the entire cluster(kafka client have smart mechanics for that).
- A good number to get started is 3 brokers , but some big clusters have over 100 brokers.
- Note** : Broker contains only a subset of the topic and the partition
Brokers and topics
- Example of Topic-A with 2 partitions and Topic-B with 2 partitions
- Note: data is distributed, and Broker 103 don't have any Topic B data.
Kafka Broker Discovery
- Each kafka broker is called "bootstrap server".
- That means that you only need to connect to one broker, and the kafka client will know how to be connected to the entire cluster (smart client).
- Each broker know about all brokers, topics and partitions (metadata).
Note: you only need to connect to one broker (any broker) and just provide the kafka topic name you want to write to.kafka client will route your data to that broker and partition for you
Topic replication factor
- Topic should have a replication factor >1 (usually between 2 and 3 ).
- This way if a broker is down, another broker can serve the data.
- Ex : Topic - A with partition and replication factor of 2.
- Note**: If a topic has a replication factor of 3: Each partition will live on 2 different brokers
Concept of Leader for a partition
- At any time only one broker can be leader of a given partition.
- Producer can only send data to the broker that is the leader of partition.
- The other brokers will replicate the data.
- Therefore, each partition has one leader and multiple ISR (In sync replication).
Default producers and Consumers behaviour with leaders.
- kafka Producers can only write to the leader broker for a partition.
- kafka Consumer by default read from the leader broker for a partition.
Kafka Consumer replica Fetching
- Since kafka 2.4, it is possible to configure consumer to read from the closet replica.
- This mey help improve latency, and also decrease network costs if using the cloud.
Producer Acknowledgement
- Producers can choose to receive acknowledgement of data writes :
- ack=0: Producer won't wait for acknowledgement (possible data loss).
- ack=2: Producer will wait for leader acknowledge (limited data loss).
- ack=all: Leader + replica acknowledgement (no data loss).
Kafka Topic Durability
- For a topic replication factor of 3, topic data durability can withstand 2 brokers loss.
- As a rule, for a replication factor if N, you can permanently lose upto N-1 broker ans still recover your data.
Zookeeper
- Zookeeper manages kafka brokers (keeps a list of them).
- Zookeeper helps in performing leader election for partition.
- Zookeeper sends notification to kafka case of changes (new topic, broker dies, broker come up, deletion topics, etc...)
- kafka 2.x cant work without zookeeper,
- kafka 3.x can work without zookeeper (KIP-500) - using kafka Raft instead.
- kafka 4.x will not have Zookeeper.
- Zookeeper design to operate with an odd number of services (1,3,5,7)
- Zookeeper has a leader (writes) the rest of the servers are followers (read)
Should you use ZooKeeper
- with kafka broker.
- Yes, until kafka 4.0 id out while waiting without zookeeper to to be production ready.
- with kafka client.
- Over time, the kafka client and CLi have been migrated to leverage the broker as a connection endpoint instead of zoopeeker.
- Since kafka 0.10 , consumer store offsets in kafka and Zookeeper and must not connect to zookeepr as it is deprecated.
- Since kafka 2.2 , the kafka-topics.sh CLI command reference kafka broker and not Zookeeper for topic management ( creating, deleting etc) and the zookeeper CLI argument is deprecated.
- All the APIs and commands that were previously leavings Zookeeper ate migrated to use kafka instead , so that when cluster are migrated to be without Zookeeper , the change is invisible to client.
- Zookeeper is also less secure than kafka, and there Zookeeper ports should only be opened to all the traffic from kafka broker, and not from kafka clients.
- Therefore, never use Zookeeper as a configuration in your client, and other programs that connect to kafka.
Kafka KRaft
- Zookeeper have scaling issues when kafka cluster have > 100,000 partitions
- By removing Zookeeper, Apache kafka can
- Scale to millions of partitions, and became easier to maintain and set-up.
- Improve stability , make to monitor, support and administer.
- Single security model form teh whole system.
- Single process to start with kafka
- Faster Controller shutdown and recovery time.
Installing kafka: https://www.conduktor.io/kafka/starting-kafka Installing kafka: https://www.conduktor.io/kafka/how-to-install-apache-kafka-on-windows
Kafka CLI : kafka-console-consumer.sh
- Consumer from tail of topic.
- Consumer from the tail of the topic.
- Show both key and value in the output.
Consumer Group and Partition Rebalanced
- Move partition between the consumers is called a re-balance.
- Reassignment of partitions happens when a consumer group leave or join a group.
- it also happens if an administrator adds new partition into a topic.
-
if a new partition is added or remove how re-balancing happens
-
Eager Re-balancing
-
Cooperative Re-balance (Incremental Re-balance)
- Re-assigning small subset of the partition from one consumer to another.
- Other consumer that don't have a reassigned partition can still process uninterrupted.
- it can go through several stable iterative to find a "stable" assignment (hence "incremental").
- Avoids the "stop-the-world" events where all consumers stop processing data.
-
How to use (Cooperative Re-balance)
- kafka Consumer: partition.assignment.strategy
- RangeAssignor: assign partitions on a pre-topic basis (can lead to imbalance).
- RoundRobin: assign partition all topic in round-robin fashion, optimal balance.
- StickyAssignor: balance like RoundRobin, and then minimises partition movement when consumer join/ leave the group in order to minimise movements.
- CooperativeStickyAssignor: rebalances strategy is identical to StickyAssignor but supports cooperative rebalances and therefore consumers can keep on consuming from teh topic.
- The default assigner is [RangeAssignor, CooperativeStickyAssignor]: which will use the RangeAssignor by default, but allows upgrading to the Cooperative StickyAssignor with just a single rolling balance that removes the RangeAssignor from the list.
- kafka Connect: already implemented (enabled by default)
- kafka Streams: Turned on by default using StreamsPartitionAssignor.
Static Group Membership
- By default, when a consumer leaves a group, its partitions are invoked as re-assigned.
- If it joins back, it will have a new "member ID" abd new partitions assigned.
- If you specify group.instance.ud it make the consumer a static member.
- upon leaving, the consumer has up to session.timeout.ms to join back and get back its partitions (else they will be re-assigned), without triggering a re-balance.
- This is helpfully when consumers maintain local state adn cache(to avoid re-building the cache).
Kafka Consumer - Auto offset commit Behaviour
- In the Java Consumer API, offsets are regularly committed.
- Enable at-least once reading scenario by default (under condition).
- Offsets are committed when you call .poll() and auto.commit.interval.ms the elapsed
- Example: auto.commit.interval.ms=5000 and enable.auto.commit=true will commit.
- Make sure all the messages are successfully processed before calling the poll() again.
- if you don't you will not be in at-least-one reading scenario.
- In the (rare) case , you must disable enable.auto.commit, and most likely most processing to a seperate thread, and then from time-to-time call .commitSync() or .commitAsync() with the correct offsets manually (advanced).
Producer Acknowledgement (acks)
- Producers can choose to receive acknowledgement of data writes:
- acks=0: Producer wont wait for acknowledgement (possible data loss)
- acks=1: Producer will wait for leader acknowledgement (limited data loss)
- acks=all : leader + replicas acknowledgement (no data loss)
-
Producer.acks=0
- When acks=0 producers consider message as "written successfully" the moment the message was sent without waiting for the broker to accept it at all.
- If the broker goes offline or an exception happens , we won't know and will lose data.
- Useful for data where it's okay to potentially lose message, such as metrics collection.
- Producers the highest throughput setting because the network overhead is minimized.
-
Producer.acks=1
- when acks=1, producers consider message as "written successfully" when the message was acknowledged by only the leader.
- Leader response is requested, but replication is not guarantee as it happens in the background.
- If the leader broker goes offline unexpectedly but replicas haven't replicated the data yet we have a data loss.
- If an ack is not received, the producer may retry the request.
-
Producer.acks=all/-1
- When ack=all, producers consider message as "written successfully" when the message is accepted by all in-sync replicas (ISR).
- Default for kafka 3.x+
Producer acks=all & min.insync.replicas (these both go hand in hand)
- The leader replica for a partition checks to see if these are enough in-sync replicas for safety writing the message (controlled by the setting min.insync.relicas)
- min.insync.replicas=1: only the broker leader need to successfully ack .
- min.insync.replicas=2: only the broker leader and 1 replicas to successfully ack .
Kafka Topic Availability
- Availability : (considering RF=3)
- acks=0 & acks=1 : if one partition is up and considering as ISR, the topic will be available for writes.
- ack=all:
- min.in-sync.replicas=1 (default): the topic must have at least 1 partition up as in ISR (that includes the reader) and so we can tolerate two broker being down.
- min.in-sync.replicas=2: the topic must have at least 2 ISR up, and therefore we can tolerate at most one broker being down (in the case of replication factor of 3), and we have the guarantee that for every write, the data will be at least written twice.
- min.in-sync.replicas=3: this wouldn't make ush since for a corresponding replication factor of 3, and we couldn't tolerate any broker going down.
- in summary, when acks=all with a replication.factor=N and min.in-sync.replicas=M we can tolerate N-M brokers going for topic available purposes.
** acks=all and min.insync.replicas=2 is the most popular options for data durability and allows you to withstand at most the loss of one kafka broker.
Producers Retries:
- In case of transient failure, developer are expected to handle exceptions, otherwise the data will be lost.
- Example of transient failure:
- NOT_ENOUGH_REPLICAS (due to min.in-sync.replicas setting):
- There is "reties"
- default to 0 for kafka <= 2.0
- default to 2147483647 for kafka >= 2.1
- The retry.backoff.ms setting is by default 100ms.
(Old version) - if you are not using as idempotent producer (not recommended - old kafka):
- In case of retries, there is a chances that messages will ve sent out of order (if a batch has failed to be sent).
- For this, you can see the setting while controls how much produce required can be made in parallel : max.in.flight.requests.per.connection.
- Default: 5
- Set it to 1 if you need to ensure ordering (may impact throughput).
Producer Timeout
- If retries>0, for example retries=2147483647, retries are bounded by a timeout.
- Since kafka 2.1, you can set : delivery.timeout.ms=120000==2min.
Idempotent Producer
- The Producer can introduce duplicate message in kafka due to network errors.
- In kafka >=0.11, you can define a "idempotent producer" which won't introduce duplicate on network error. ![](images/img_16.png
- Idempotent producers are grate to guarantee a stable and safe pipeline !
- They come with:
- retries=Integer.MAX_VALUE;
- max.in.flight.request=1 (version==0.11)
- max.in.flight.request=5 (kafka version>=1.0 higher performance & kafka ordering - kafka5494))
- These setting are applied automatically after your producer has started if not manually set.
- just set : property.set("enable.idempotence", true);
Safe kafka Producers
- acks=all: ensure data is properly replicated before an ack is received.
- min.insync.replicas=2 (broker/topic level): ensure two brokers in ISR at least have the data after an ack.
- enable.idempotent=true: Duplicated are not introduces due to network retries.
- retries=MAX_INT(PRODUCER level): Retry until delivery.timeout.ms is reached.
- delivery.timeout.ms=120000: failed after retry for 2 minutes
- max.in.flight.request.per.connection=5: ensure max performance while keeping message ordering.
Message Compression at the Producer level
- Producer usually send data that is text-based, for example with JSON data.
- In this case, it is important to apply compression to the producer.
- Compression can be enabled at the Producer level and doesn't require any configuration change in the Brokers or in consumers.
- compression.type can be none(default), gzip, lz4, snappy, zsed (kafka 2.1)
- Compression is more effective the bigger the batch of message being sent to kafka.
Message Compression
- The compressed batch has the following advantage:
- Much smaller producer request size (compression ration upto 4x).
- Faster to transfer data over network.
- Better throughput.
- Better disk utilization in kafka (stored message on disk are smaller).
- Disadvantages (very minor)
- Producers must commit some CPu cycle to compression.
- Consumer must commit som CPu cycle to decompression.
- Overall:
- Consider testing snappy or lz4 for optimal speed / compression ratio (test others too).
- Consider tweaking linger.ms and batch.size to have bigger batch, and therefore more compression and higher throughput.
- Use compression in production.
- Broker/topic level:
- compression.type=producer(default), the broker takes the compressed batch from the producer client and writes it directly to the topic's log file without recompressing the data.
- compression.type=none: all batches are decompressed by the broker
- compression.type=lz4 :
- If it's matching the producer setting, data is stored in disk as is.
- If it's a different compression setting, batched are decompressed by the broker and then re-compressed using compression algo specified.
- Warning: if you enable broker-side compression, it will consumer extra CPu cycle.
linger.ms & batch.size (how to improve batching mechanism)
-
By default, Kafka producers try to send records as soon as possible:
- it will have up to max.in.flight.requests.per.connections=5, meaning up to 5 message batches being in flight(being sent between the producer in the broker) at most
- After this, if more message must ve sent while others are in fight, Kafka is smart and will start batching them before the next batch send.
-
This is smart batching helps increase throughput while maintaining very low latency. 1.Added benefit: batches have higher compression ratio so better efficiency
-
2 Setting to influence the batching mechanism:
- linger.ms :(default 0) how long to wait until send a batch. Added a small number of example 5 ms helps add more message in the hatch at the expense of latency.
- batch.size: if a batch is filled before linger.ms, increase the batch size.
Batch size (default)
- Max number of bytes that will be included in a batch.
- Increasing a batch size something like 32kb or 64 kb can help increase the compression, throughput and efficiency of request.
- Any message that is bigger than the batch size will not be batched.
- A batch is allowed per partition, so make sure that you don't set it to a number that's too high, otherwise you'll run waste memory.
https://blog.cloudflare.com/squeezing-the-firehose/
Producer Default Partitioner when key != null
- key hashing: is the process of determining the mapping of key of a partitions.
- in the default partitioner, the key are hashed using the mummur2 algorithm. targetPartitions = Math.abs(Utils.murmur2(keybytes)) % (numberPartitions -1);
- This means that same key will go to the same partition and adding partitions to a topic will completely alter the formula . 4.it a most likely preferred to not override the behaviour of the partitions, but it is possible to do so using partitioner.class.
Producer Default partitioner when key=null
- When key=null, the producer has a default partitioner that varies:
- Round RobinL for kafka 2.3 and below
- Sticky Partitioner: for Kafka 2.4 and above.
- Sticky Partitioner improves the performance if the producer especially when high throughput when the key is null.
Max.block.ms & buffer.memory
- if the producer produces faster than the broker can take, the records will be buffered in memory.
- buffer.memory=32MB : the size of the send buffer
- That buffer will fill up overtime and empty back down when the throughput to the broker increases.
- I that buffer is full (32 MB), then the .send() method will start to block (won't return right away).
- max.block.ms-60000: the time the .send() will block until throwing an exception.
- The producer has filled up iy=ts buffer.
- The broker is not accepting any new data.
- 60 seconds has elapsed.
- If you hit an exception hit that usually means your broker are down or overloaded as they can't respond to request.
Consumer Offset Commit Strategy
- There are two most common pattern for committing offset in a consumer application.
- 2 Strategies:
- (easy) enable.auto.commit=true & synchronous processing of batches
- (medium) enable.auto.commit=false & manual commit of offsets.
Kafka Consumer - Auto Offset Commit Behavior
- In the Java Consumer API, offsets are regularly committed
- Enable at-least once reading scenario by default (under conditions).
- Offsets are committed when you call .poll() and auto.commit=true
- <Make sure message are all successfully processed before you call pall() again
- If you don't, you will not be in at-least-once reading scenario.
- In that (rare) case, you must disable enable.auto.commit and most likely most processing to a separate thread, and then from time-to-time .commitSync() ot .commitASync() with the correct offset manually (advanced).
Consumer Offset Commits Strategies
-
enable.auto.commit=true & synchronous processing or batch
-
With auto-commit, offset will be committed automatically for you at regular interval (auto.commit.interval.ms=5000 by default) every-time you call .poll()
-
if you don't use synchronous processing, you will be in "at-most-once" behaviour because offsets will be committed before you data is processed.
-
enable.auto.commit=false & synchronous processing of batches
-
you control when you commit offset and what's the condition for committing them.
-
Ex: accumulating records into a buffer and then flushing the buffer to a database + committing offsets asynchronously then.
-
enable.auto.commit=false & storing offset externally (advanced)
Consumer Offset reset Behaviour
-
But if you application has a bug, you consumer can be down.
-
If kafka has a retention of 7 days, and your consumer is down for more than 7 days , the offset are "invalid".
-
The behaviour for the consumer is to then use:
- auto.offset.reset = latest : will read from the end of the log.
- auto.offset.reset = earliest : will read from the start of the log.
- auto.offset.reset = none : throw exception if no offset is found.
-
Additionally , consumer offset can be lost
- if a consumer hasn't read data in 1 day (kafka<2.0)
- if a consumer hasn't read data in 7 day (kafka>=2.0)
-
This can be controlled by the broker setting offset.retention.minutes.
Replaying daya for Consumers
- To replay ata for a consumer group.
- Take all the consumers from a specific group down.
- Use kafka -consumer-group command to set offset to what you want.
- Bottom Line:
- Set proper data retention period & offset retention period.
- Ensure the auto offset reset behaviour is the one you expect / want.
- Use replay capability in case of unexpected behavior.
Controlling Consumer Liveliness
- Consumer ina group talk to a consumer groups Coordinator.
- To detect consumers that are "down " there is a "heartbeat" mechanism and a "poll" mechanism.
- To avoid issues, consumers are encouraged to process data fast and poll often.
Consumer Heartbeat Thread
- heartbeat.internal.ms (default 3 seconds):
- How often to send heartbeats.
- Usually set to 1/3rd of session.timeout.ms.
- session.timeout.ms(default 45 sec kafka 3.x+, before 10 sec):
- Heartbeats are sent periodically to the broker.
- if no heartbeat is sent during that period, the consumer is considered dead.
- Set even lower to faster consumer reblances.
Consumer Poll Thread
-
max.pol.interval.ms(default):
- maximum amount of time between two .poll() can before declaring the consumer dead.
- This is relevant for Big Data frameworks like SPark in case the processing takes time.
-
This mechanism is used to detect a data processing issue with the consumer (consumer is stuck)
-
max.poll.records (default 500):
- Controls how many records to receive per poll request.
- increase if your message are small and have a lot of available RAM
- Good to monitor how many records are polled per request.
- Lower if it takes you too much time to process records.
Consumer Poll Behaviour
- fetch.min.bytes (default 1):
- control how much data you want to poll at least on each request.
- Helps improve throughput and decrease request number.
- At the cost of latency
- fetch.max.wait.ms (default 500):
- The maximum amount of time the kafka broker will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement giving by fetch.min.bytes
- This means that until the requirement of fetch.min.bytes to be satisfied , you will have up to 500 ms of latency before the fetch returns data to the consumer (eg introducing a potential delay to be more efficient in requests)
- max.partition.fetch.bytes (default 1MB):
- the max amount of data per partition the server will return.
- if you read from 100 partitions , you will need a lot of memory (RAM).
- fetch.max.bytes (default 55MB):
- maximum data returned for each fetch request.
- if you have available , try increase fetch.max.bytes to allow the consumer to read more data in eahc request.
Default Consumer behaviour with partition leaders
-
Kafka consumers by default will read from the leader broker for a partition.
-
possibly higher latency (multiple data center) + high network charges.
-
Since kafka 2.4 , it is possible to configure consumer to read from the closest replica.
-
This may help improve latency, and also decrease network costs of using the cloud.
Consumer Rack Awareness (V2.4+)
- broker Setting:
- Must be version Kafka v2.4+
- rack.id config must be set to the data center ID (eg:AZ ID of AWS)
- replica.selector.class must be set : org.apache.kafka.common.replica.RackAwareReplicaSelector
- Consumer client setting:
- Set client.rack to the data center ID the consumer os launched on.
Kafka Connect: Solve External Source => Kafka and kafka>External Sink Kafka Streams: Solve transformation Kafka => Kafka Schema Register: Helps using Schema in kafka.
Kafka Connect - High Level (https://www.confluent.io/hub/)
- Source Connectors : to get data from Common Data Sources.
- Sink Connectors : to publish that data in common Data Sources
- Make it Easy for non - Experience dev to quickly get there data reliable into kafka.
- Part of your ETL pipeline
- Scaling made easy for small pipeline to company-wide pipelines.
- Other Programmers may already have done a very good job: re-usable code!.
- Connectors achieve fault tolerance, idempotence, distribution, ordering
kafka Streams
-
you want to do the following from the topic:
- count the number of mgs in 10 sec
- Analyze the bot vs user changes
-
With the kafka Producer and Consumer you can achieve that but it's very low level and not developer friendly
-
Easy data processing and transformation library within kafka
- Standard java Application.
- no need to create a separate Cluster
- Highly scalable, elastic and fault-tolerant
- Exactly-Once Capacities.
- One records at a time processing (no-batching)
- Works for any application size.
Need of a Schema Registry
- kafka takes bytes as an input and publishes them .
- No data verification
- what if producer send bad data or field get renamed?
- The consumer will break.
- We need a data to be a self.
- We need a data to be self describe
- We need to be able to evolve data without breaking downstream consumer
- we need a schema and schema registry!
- what if the Kafka brokers were verifying the message they receive?
- It would break what makes kafka sk good.
-
Kafka doesn't parse or even read you data (no CPY usage).
-
Kafka takes bytes as an inout without even loading them into memory (that's called zero copy)
-
kafka distributes bytes.
-
As far as kafka is concerned, it doesn't even know if your data is an integer, a string etc.
-
The schema Registry must be a separate components.
-
Producers and Consumers need to ve able to talk to it.
-
Teh schema Registry must be able to reject bad data.
-
A common data must be agreed upon.
- It needs to support schemas
- It needs to support evolution
- It needs to be lightweight.
-
Enter the Schema Registry
-
And Apache Avro as the data format.
-
Schema Registry - Purpose
- Store and retrieve schemas for Producers/Consumers.
- Enforce backend / Forward / Full compatibility on topics.
- Decrease the size of the payload of data sent to kafka.
- Utilizing a schema registry has a lot of benefits.
- BUT it implies you need to:
- set it up well
- make sure it's highly available
- Partially change the producer and consumers code.
- Apache Avro as a format is awesome but has a learning curve.
- Other formats includes Protobuf and JSON Schema.
- The Confluent Schema Registry is free and source-available.
- Other open source alternative may exist.
Partitions Count and Replication Factor
- The 2 main important parameters when creating a topic.
- They impact performance and durability of the system overall.
- It is best to get the parameters right the first time.
- If the partitions count increase during a topic lifecycle, you will break your keys ordering guarantees.
- If the replication factor increase during a topic lifecycle, you put more pressure on your cluster, which can lead to unexpected performance decrease.
Choose the Partitions Count
- Each partition can handle a throughput of a MB/s (measure it for your setup).
- more partition implies:
- Better parallelism, better throughput
- Ability to run more consumers in a group to scale (max as many consumers per group as partitions).
- Ability to leverage more brokers if you have a large cluster.
- But more elections to perform for Zookeeper (if using Zookeeper).
- But more files opened on Kafka.
Guidelines:
- Partitions per topic
- (Intuition) small cluster (<6) : 3x# brokers
- (Intuition) Big cluster (>12 brokers): 2x# of brokers
- Adjust for number of consumers you need to run in parallel at peak throughput.
- Adjust for producer throughput.
Choosing the Replication factor
- Should be at least 2, usually 3, maximum 4
- The higher the replication factor (N):
- Better durability of your system (N-1 brokers can fail).
- Better availability of your system (N-min insync.replicas if producer acks=all)
- But more replication (higher latency if acks=all)
- But more disk space on your system (50% more if RF is 3 instead of 2)
Guidelines:
- Set it to 3 to get started (you must have at least 3 brokers for that)
- If replication performance is an issue, get a better broker instead of less RF.
- Never set it to 1 in production.
Cluster guidelines
- Total number of partitions in the cluster:
- Kafka with Zookeeper : max 200,000 partitions.
- Still recommended a max of 4000 partitions per broker (soft limit).
- Kafka with kRaft: potentially millions of partitions.
- Kafka with Zookeeper : max 200,000 partitions.
Kafka topic naming convention
https://cnr.sh/essays/how-paint-bike-shed-kafka-topic-naming-conventions
Kafka CLuster Setup - High Level Architecture
- You want multiple broker in different data center (racks) to distribute your load, You also want a cluster of at least 3 Zookeeper (if using Zookeeper)
- In AWS:
Kafka Cluster Setup Gotchas
- It's not easy to setup a cluster
- You want to isolate each Zookeeper & Broker on Separate Servers.
- Monitoring needs to be implemented.
- Operations must ve mastered.
- you need a excellent kafka Admin.
- Alternative managed "Kafka as a Service" offerings from various companies.
- Amazon MSK, Confluence Cloud, Aiven, CloudKarafta, Upstash, etc.
- No operational burdens (updates, monitoring, setup, etc...)
Other components to properly setup
- Kafka connect Clusters.
- Kafka Schema Registry: make sure to run two for high availability.
- Ui Tools for ease of administration.
Other Components to properly Setup
- kafka Connect Clusters.
- Kafka Schema Registry: make sure to run two for high availability.
- Ui tools for ease administration.
- Admin Tools for automation workflow.
Kafka Monitoring and Operations.
- Kafka exposes metrics through JMX.
- These metrics are highly important for monitoring kafka and ensuring the Systems are behaving correctly under load.
- Common places to hosts the Kafka metrics:
- ELK
- Datadog
- newRelic
- Confluent Control Center.
Kafka Monitoring
Some of the most important metrics are:
- Under Replicated Partitions: Number of partitions are have problems with the ISR (in-sync replicas). May indicate a high load on the system
- Request Handlers : utilization of threads for IO , network, etc ... overall utilization of an Apache Kafka broker.
- Request timing : hot long it takes to reply to request. Lower is better as latency will be improved.
Partitions and Segments
- Topics are made of partitions (we already know that).
- Partitions are made of.... segments (files)!
- Only one segment is ACTIVE (the one data is being written to / last one).
- Two segment settings:
- log.segment.bytes: the max size of a single segment in bytes (default 1GB)
- log.segment.ms: the time kafka will wait before committing the segment if not full (1 week)
Segments and Indexes
- Segments come with two indexed (files):
- An offset to position index: helps kafka find where to read from to find a message.
- A timestamp to offset index: helps Kafka find message with a specific timestamp.
-
A Smaller log.segment.bytes (size, default : 1 gb) means:
- More segments per partitions
- Log Compaction happens more often.
- Kafka must keep more files opened
-
A small log.segment.ms (time, default 1 week) means:
- you set a mex frequency for log compaction (more frequency triggers).
- maybe you want default compaction instead of weekly.
Log Cleanup Policy
- Many Kafka Cluster make data expire, according to a policy
- That concept is called log cleanup.
policy 1: log.cleanup.policy=delete(kafka default for all user topics)
- Delete based on age of data (efault is a week).
- Delete based on max size of log (default is -1 == infinite)
policy 2: log.cleanup.policy=compact (kafka default for topic _consumer_offsets)
- Delete based on keys of your message
- Will delete old duplicate keys after the active segment is committed.
- Infinite time and space retention.
Log Clean
- Deleting data from kafka allows you to :
- Control the size of the data on the disk, delete obsolete data.
- Overall: Limit maintenance work on the Kafka Cluster.
- How often does log cleanup happens?
- Log Cleanup happens on you partition segments!
- Smaller / More segments means that log cleanup will happen more often.
- Log cleanup shouldn't happen to often => takes CPU and RAM resources.
- The cleaner checks for work every 15 seconds (log.cleaner.backoff.ms)
Log Cleanup Policy : Delete
-
log.retention.hours:
- numbers of hours to keep data for (default is 168 - one week)
- Higher number means more disk space.
- Lower number means that less data is retained (if your consumers are down for too long, they can miss data).
- Other parameters allowed : log.retention.ms, log.retention.minutes (smaller unit has precedence).
-
log.retention.bytes:
- max size in bytes for each partition (default is -1-infinite)
- Useful to keep the size of a log under a threshold.
-
Use cases - two common pair of options:
- One week of retention:
- log.retention.hours=168 and log.retention.bytes=-1
- Infinite time retention bounded by 500MB:
- log.retention.ms=1 and log.retention.bytes=5244288000
- One week of retention:
Log Cleanup Policy: Compact
- Log compaction ensure that you log contains at least last known value for a specific key within a partition.
- Very useful if we just require a SNAPSHOT instead if full history (such as for a data table in a database).
- The idea is that we only keep the latest "update" for a key in our log.
Example
Log Compaction Guarantees
- Any consumer that is reading from the tail of a log (most current data) will still see all the message sent to the topic.
- Ordering of messages it kept, log compaction only removes some messages, but does not re-order them.
- The offset of a message is immutable (it never changes). Offsets are just skipped if a message is missing.
- Deleted records can still by consumers for a period of delete.retention.ms (default 24 hours).
Log Compaction Myth busting
- It doesn't you from pushing duplicate data to kafka.
- De-duplication is done after a segment is committed.
- your consumers will read from tail as soon as the data arrives.
- It doesn't prevent you from reading duplicate data from kafka.
- Same points as above.
- Log Compaction can fail from time to time.
- It is an optimization and it the compaction thread might crash.
- Make sure you assign enough memory to it and that is get triggered
- Reset kafka if log compaction is broken.
- you can't trigger compaction using an API call (for now...);
**Log Compaction **
- Log compaction log.cleanup.policy=compact is impacted by:
- segment.ms (default 7days): Max amount of time to wait to close active segment.
- segment.bytes (default 1G): Max size of segment
- min.compaction.lag.ms (default 0): how long to wait before a message can be compacted.
- deleting.retention.ms (default 24 hours): wait before deleting data marked for compaction.
- min.cleanable.dirty.radio (default 0.5): higher => less, more efficient cleaning.
https://www.conduktor.io/kafka/kafka-topic-configuration-log-compaction
unclean.leader.election.enable
- If all your In sync replicas go offline (but you still have out of sync replicas up), you have the following option:
- Wait for an ISR to come back online (default)
- Enable unclear.leader.election.enable=true and start producing to non ISR partitions
- If you enable unclear.leader.election.enable=true, you improve availability , but you will lose data because otherwise message on ISR will ve discarded when that come back online and replicate data from the new leader.
- Overall, this is a very dangerous setting, and its implication must ve understand fully before enabling it.
- Use cases, include metrics collection, log collection, and other cases where data loss is somewhat acceptable, at the trade-off of availability.
Large messages in Apache kafka
- Kafka has a default of 1 MB per message in topic, as large message are considered inefficient and an anti-pattern.
- Two approaches to sending large messages:
- Using an external store: store message in HDFC, Amazon S3, Google Cloud Storage, etc... and send a reference of that message ti Apache Kafka.
- Modifying Kafka parameters: must change broker, producer and consumer settings.
Large Message using External Store:
- Store the large message (e.g video, archive file, etc...) outside of kafka.
- Send a reference of that message of kafka.
- Write custom code at the producer / consumer level to handle this pattern.
Sending Large Message in Kafka eg(2 MB)
- Topic-wise, Kafka-side, set max message size to 10MB:
- Broker side: modify message message.max.bytes
- Topic size: modify message max.message.bytes
- Warning: the setting have similar but different name; this is not a typo!
- Broker-wise, set max replication fetch size to 10MB
- replica.fetch.max.bytes=2048
- Consumer-side, must increase fetch size of the consumer will crash:
- max.partition.fetch.bytes=2048
- Producer-side, must increase the max request size
- max.request.size=2048