Apache Kafka foundation of modern data stream processing

Working on the next project using again awesome Apache Kafka and again fighting against fundamental misunderstanding of the philosophy of this technology which probably usually comes from previous experience using traditional messaging systems. This blog post aims to make the mindset switch as easy as possible and to understand where this technology fits in. What pitfalls to be aware off and how to avoid them. On the other hand this article doesn’t try to cover all or goes to much detail.

Apache Kafka is system optimized for writes – essentially to keep up with what ever speed or amount producer sends. This technology can configured to meet any required parameters. That is one of the motivations behind naming this technology after famous writer Franz Kafka. If you want to understand philosophy of this technology you have to take a look with fresh eye. Forget what you know from JMS, RabbitMQ, ZeroMQ, AMQP and others. Even though the usage patterns are similar internal workings are completely different – opposite. Following table provides quick comparison

JMS, RabbitMQ, …
Apache Kafka
Push model
Pull model
Persistent message with TTL
Retention Policy
Guaranteed delivery
Guaranteed “Consumability”
Hard to scale
Scalable
Fault tolerance – Active – passive
Fault tolerance – ISR (In Sync Replicas)

Core ideas in Apache Kafka comes from RDBMS. I wouldn’t describe Kafka as a messaging system but rather as a distributed database commit log which in order to scale can be partitioned. Once the information is written to the commit log everybody interested can read it at its own pace and responsibility. It is consumers responsibility to read it not the responsibility of the system to deliver the information to consumer. This is the fundamental twist. Information stays in the commit log for limited time given by retention policy applied. During this period it can be consumed even multiple times by consumers. As the system has reduced set of responsibilities it is much easier to scale. It is also really fast – as sequence read from the disk is similar to random access memory read thanks to effective file system caching.

kafkaoffsets

Topic partition is basic unit of scalability when scaling out Kafka. Message in Kafka is simple key value pair represented as byte arrays. When message producer is sending a message to Kafka topic a client partitioner decides to which topic partition message is persisted based on message key. It is a best practice that messages that belongs to the same logical group are send to the same partition.  As that guarantee clear ordering. On the client side exact position of the client is maintained on per topic partition bases for assigned consumer group. So point to point communication is achieved by using exactly the same consumer group id when clients are reading from topic partition. While publish subscribe is achieved by using distinct consumer group id for each client to topic partition. Offset is maintained for consumer group id and topic partition and can be reset if needed.

kafkacommunication

Topic partitions can be replicated zero or n times and distributed across the Kafka cluster. Each topic partition has one leader and zero or n followers depends on replication factor. Leader maintains so called In Sync Replicas (ISR) defined by delay behind the partition leader is lower than replica.lag.max.ms. Apache zookeeper is used for keeping metadata and offsets.

kafkacluster

Kafka defines fault tolerance in following terms:
  • acknowledge – broker acknowledge to producer message write
  • commit – message is written to all ISR and consumer can read
While producer sends messages to Kafka it can require different levels of consistency:
  • 0 – producer doesn’t wait for confirmation
  • 1 – wait for acknowledge from leader
  • ALL – wait for acknowledge from all ISR ~ message commit

Apache Kafka is quite flexible in configuration and as such it can meet many different requirements in terms of throughput, consistency and scalability. Replication of topic partition brings read scalability on consumer side but also poses some risk as it is some additional level of complexity to achieve this. If you are unaware of those corner cases it might lead to nasty surprises especially for new comers. So let’s take a closer look at following scenario.

We have topic partition wit a replication factor 2. Producer requires highest consistency level, set to ack = all. Replica 1 is currently leader. Message 10 is committed hence available to clients. Message 11 is not acknowledged nor committed due to the failure of replica 3. Replica 3 will be eliminated from ISR or put offline. That causes that message 11 becomes acknowledged and committed.

kafka_uc1

Next time we loose Replica 2 it is eliminated from ISR and same situation repeats for messages 12 and 13.
kafka_uc2.png
Situation can still be a lot worse, if cluster looses current partition leader – Replica 1 is down now.
kafka_uc3
What happens if Replica 2 or Replica 3 goes back online before Replica 1? One of those becomes a new partition leader and we lost data messages 12 and 13 for sure!
kafka_uc4

Is that a problem? Well the correct answer is: It depends. There are scenarios where this behavior is perfectly fine. Imagine collecting logs from all machines via sending them through Kafka. On the other hand if we implement event sourcing and we just lost some events that we cannot recreate the application state correctly. Yes we have a problem! Unfortunately, if that doesn’t changed in latest releases, that is default configuration if you just install new fresh Kafka cluster. It is a set up which favor availability and throughput over other factors. But Kafka allows you to set it up in a way that it meets your requirements for consistency as well but will sacrifice some availability in order to achieve that (CAP theorem). To avoid described scenario you should use following configuration. Producer should require acknowledge level ALL. Do not allow to kafka perform a new leader election for dirty replicas – use settings unclean.leader.election.enable = false. Use replication factor (default.replication.factor = 3) and require minimal number of replicas to be in sync state to higher than 1 (min.insync.replicas = 2).

We already quickly touched the topic of message delivery to consumer. Kafka doesn’t guarantees that message was delivered to all consumers. It is responsibility of the consumers to read messages. So there is no semantics of persistent message as known from traditional messaging systems. All messages send to Kafka are persistent meaning available for consumption by clients according to retention policy. Retention policy essentially specifies how long the message will be available in Kafka. Currently there are two basic concepts – limited by space used for keeping messages or time for which the message should be at least available. The one which gets violated first wins.

When I need to clean the data from the Kafka (triggered by retention policy) there are two options. The simplest one is just delete the message. Or I can compact messages. Compaction is a process where for each message key is just one message, usually the latest one. That is actually a second semantics of key used in message.

What features you cannot find in Apache Kafka compared to traditional messaging technologies? Probably the most significant is an absence of any selector in combination with listen (wake me on receive). For sure can be implemented via correlation id, but efficiency is on the completely different level. You have to read all messages, deserialize those and filter. Compared to traditional selector which uses custom field in message header where you don’t need even to deserialize message payload that is on completely different level. Monitoring Kafka on production environment essentially concerns elementary question: Are the consumers fast enough? Hence monitoring consumers offsets in respect to retention policy.

Kafka was created in LinkedIn to solve specific problem of modern data driven application to fill the gap in traditional ETL processes usually working with flat files and DB dumps. It is essentially enterprise service bus for data where software components needs exchange data heavily. It unifies and decouples data exchange among components. Typical uses are in “BigData” pipeline together with Hadoop and Spark in lambda or kappa architecture.  It lays down foundations of modern data stream processing.

This post just scratches basic concepts in Apache Kafka. If you are interested in details I really suggest to read following sources which I found quite useful on my way when learning Kafka:

Hadoop IO and file formats

In this post dedicated to Big Data I would like to summarize hadoop file formats and provide some brief introduction to this topic. As things are constantly evolving especially in the big data area I will be glad for comments in case I missed something important. Big Data framework changes but InputFormat and OutputFormat stays the same. Doesn’t matter what’s big data technology is in use, can be hadoop, spark or …

Let’s start with some basic terminology and general principles. Key term in mapreduce paradigm is split which defines a chunk of the data processed by single map. Split is further divided into record where every record is represented as a key-value pair. That is what you actually know from mapper API as your input. The number of splits gives you essentially the number of map tasks necessary to process the data which is not in clash with number of defined mappers for your mapreduce slots. This just means that some map tasks need to wait untill the map slot is available for processing. This abstraction is hidden in IO layer particularly InputFormat or OutputFormat class which contains RecordReader, RecordWriter class responsible for further division into records. Hadoop comes with a bunch of pre-defined file formats classes e.g. TextInputFormat, DBInputFormat, CombinedInputFormat and many others. Needless to say that there is nothing which prevents you from coming with your custom file formats.

Described abstraction model is closely related to mapreduce paradigm but what is the relation to underlying storage like HDFS? First of all, mapreduce and distributed file system (DFS) are two core hadoop concepts  which are “independent” and the relation is defined just through the API between those components. The well-known DFS implementation is HDFS but there are several other possibilities(s3, azure blob, …). DFS is constructed for large datasets. Core concept in DFS is a block which represents a basic unit of the original dataset for a manipulation and processing e.g. replication etc. This fact puts also additional requirements on dataset file format: it has to be splittable – that means that you can process a given block independently from the rest of the dataset. If the file format is not splittable and you would run a mapreduce job you wouldn’t get any level of parallelism and the dataset would be processed by a single mapper. Splittability requirement also applies if the compression is desired as well.

What is the relation between a block from DFS and split from mapreduce? Both of them are essentially key abstractions for paralelization but just in different frameworks and in ideal case they are aligned. If they are perfectly aligned that hadoop can take full advantage of so-called data locality feature which runs the map or reduce tasks on a cluster node where the data resides and minimize the additional network traffic. In case of imprecise alignment a remote reads will happen for records missing for a given split. For that reasons file formats includes sync markers or points.

To take an advantage and full power of hadoop you design your system for a big files. Typically the DFS block size is 64MB but can be bigger. That means that biggest hadoop enemy is a small file. The number of files which lives in DFS is somehow limited by the size of Name Node memory. All the datasets metadata are kept in memory. Hadoop offers several strategies how to avoid of this bad scenario. Let’s go through those file formats.

HAR file (stands for hadoop archive) – is a specific file format which essentially packs a bunch of files into a single logical unit which is kept on name node. HAR files doesn’t support additional compression and as far as I know are transparent to mapreduce. Can help if name nodes are running out of memory.

Sequence file is a kind of file based data structure. This file format is splittable as it contains a sync point after several records. Record consists of key – value and metadata. Where key and value is serialized via class whose name is kept in the metadata. Classes used for serialization needs to be on CLASSPATH.

Map file is again a kind of hadoop file based data structure and it differs from a sequence file in a matter of the order. Map file is sorted and you can perform a look up. Behavior pretty similar to java.util.Map class.

Avro data file is based on avro serializaton framework which was primarily created for hadoop. It is a splittable file format with a metadata section at the beginning and then a sequence of avro serialized objects. Metadata section contains a schema for a avro serialization. This format allows a comparison of data without deserialization.

Google Protocol buffers are not natively supported by hadoop but you can plug the support via libraries as elephant-bird from twitter.

So what about file formats as XML and JSON? They are not natively splittable and so “hard” to deal.  A common practice is to store then into text file a single message per line.

For textual files needless to say that those files are the first class citizens in hadoop. TextInputFormat and TextOutputFormat deal with those. Byte offset is used as key and  the value is  the line content.

This blog post just scratches the surface of hadoop file formats but I hope that it provides a good introduction and explain connection between two essential concepts – mapreduce and DFS. For further reference book Hadoop Definite guide goes into the great detail.

Hadoop High Availability strategies

Scalability, Availability, Resilience – those are just common examples of computer system requirements which forms an overall application architecture very strongly and have direct impact to “indicators” such as Customer Satisfaction Ratio, Revenue, Cost, etc. The weakest part of the system has the major impact on those parameters. Topic of this post availability is defined as percentage of time that a system is capable of serving its intended function.

In BigData era Apache Hadoop is a common component to nearly every solution. As the system requirements are shifting for purely batch oriented systems to near-to-real-time systems this just adds pressure on systems availability. Clearly if system in batch mode runs every midnight than 2 hours downtime is not such a big deal as opposed to near-to-real-time systems where result delayed by 10 min is pointless.

I this post I will try to summarize Hadoop high availability strategies as a complete and ready to use solutions I encountered during my research on this topic.

In Hadoop 1.x the well known fact is that the Name Node is a single point of failure and as such all high availability strategies tries to cope with that – strengthen the weakest part of the system. Just to clarify widely spread myth – Secondary Name Node isn’t a back up or recovery node by nature. It has different tasks than Name Node BUT with some changes Secondary Name Node can be started in the role of Name Node. But neither this doesn’t work automatically nor that wasn’t the original role for SNN.

High availability strategies can be categorized by the state of standby: Hot/Warm Standby or Cold Standby. This has direct correlation to fail over(start up) time. To give an raw idea(according to doc): Cluster with 1500 nodes with PB capacity – the start up time is close to one hour. Start up consists of two major phases: restoring the metadata and then every node in HDFS cluster need to report block location.

Typical solution for Hadoop 1.x which makes use of NFS and logical group of name nodes. Some resources claim that in case of NFS unavailability the name node process aborts what would effectively stop the cluster. I couldn’t verify that fact in different sources of information but I feel important to mention that. Writing name node metadata to NFS need to be exclusive to a single machine in order to keep metadata consistent. To prevent collisions and possible data corruption a fencing method needs to be defined. Fencing method assures that if the name node isn’t responsive that he is really down. In order to have a real confidence a sequence of fencing strategies can be defined and they are executed in order. Strategies ranges from simple ssh call to power supply controlled over the network. This concept is sometimes called shot me in the head. The fail over is usually manual but can be automated as well. This strategy works as a cold standby and hadoop providers typically provides this solution in their High Availability Kits.

Because of the relatively long start up time of back up name node some companies (e.g. Facebook) developed their own solutions which provides hot or warm standby. Facebook’s solution to this problem is called avatar node. The idea behind is relatively simple: Every node is wrapped to so called avatar(no change to original code base needed!). Primary avatar name node writes to shared NFS filler. Standby avatar node consist of secondary name node and back up name node. This node continuously reads HDFS transaction logs and keeps feeding those transactions to encapsulated name node which is kept in safe mode which prevents him from performing of any active duties. This way all name node metadata are kept hot. Avatar in standby mode performs duties of secondary name node. Data nodes are wrapped to avatar data nodes which sends block reports to both primary and standby avatar node. Fail over time is about a minute. More information can be found here.

Another attempt to create a Hadoop 1.x hot standby coming form China Mobile Research Institute is based on running synchronization agents and sync master. This solution brings another questions and it seems to me that it isn’t so mature and clear as the previous one. Details can be found here.

An ultimate solution to high availability brings Hadoop 2.x which removes a single point of failure by a different architecture. YARN (Yet Another Resource Negotiator) also called MapReduce 2. And for HDFS there is an another concept called Quorum Journal Manager (QJM) which can use NFS or Zookeeper as synchronization and coordination framework. Those architectural changes provides the option of running two redundant NameNodes in the same cluster in an Active/Passive configuration with a hot standby.

This post just scratches the surface of Hadoop High Availability and doesn’t go deep in detail daemon by daemon but I hope that it is a good starting point. If someone from the readers is aware of some other possibility I am looking forward to seeing that in the comment section.