Apache Kafka foundation of modern data stream processing

Working on the next project using again awesome Apache Kafka and again fighting against fundamental misunderstanding of the philosophy of this technology which probably usually comes from previous experience using traditional messaging systems. This blog post aims to make the mindset switch as easy as possible and to understand where this technology fits in. What pitfalls to be aware off and how to avoid them. On the other hand this article doesn’t try to cover all or goes to much detail.

Apache Kafka is system optimized for writes – essentially to keep up with what ever speed or amount producer sends. This technology can configured to meet any required parameters. That is one of the motivations behind naming this technology after famous writer Franz Kafka. If you want to understand philosophy of this technology you have to take a look with fresh eye. Forget what you know from JMS, RabbitMQ, ZeroMQ, AMQP and others. Even though the usage patterns are similar internal workings are completely different – opposite. Following table provides quick comparison

JMS, RabbitMQ, …
Apache Kafka
Push model
Pull model
Persistent message with TTL
Retention Policy
Guaranteed delivery
Guaranteed “Consumability”
Hard to scale
Scalable
Fault tolerance – Active – passive
Fault tolerance – ISR (In Sync Replicas)

Core ideas in Apache Kafka comes from RDBMS. I wouldn’t describe Kafka as a messaging system but rather as a distributed database commit log which in order to scale can be partitioned. Once the information is written to the commit log everybody interested can read it at its own pace and responsibility. It is consumers responsibility to read it not the responsibility of the system to deliver the information to consumer. This is the fundamental twist. Information stays in the commit log for limited time given by retention policy applied. During this period it can be consumed even multiple times by consumers. As the system has reduced set of responsibilities it is much easier to scale. It is also really fast – as sequence read from the disk is similar to random access memory read thanks to effective file system caching.

kafkaoffsets

Topic partition is basic unit of scalability when scaling out Kafka. Message in Kafka is simple key value pair represented as byte arrays. When message producer is sending a message to Kafka topic a client partitioner decides to which topic partition message is persisted based on message key. It is a best practice that messages that belongs to the same logical group are send to the same partition.  As that guarantee clear ordering. On the client side exact position of the client is maintained on per topic partition bases for assigned consumer group. So point to point communication is achieved by using exactly the same consumer group id when clients are reading from topic partition. While publish subscribe is achieved by using distinct consumer group id for each client to topic partition. Offset is maintained for consumer group id and topic partition and can be reset if needed.

kafkacommunication

Topic partitions can be replicated zero or n times and distributed across the Kafka cluster. Each topic partition has one leader and zero or n followers depends on replication factor. Leader maintains so called In Sync Replicas (ISR) defined by delay behind the partition leader is lower than replica.lag.max.ms. Apache zookeeper is used for keeping metadata and offsets.

kafkacluster

Kafka defines fault tolerance in following terms:
  • acknowledge – broker acknowledge to producer message write
  • commit – message is written to all ISR and consumer can read
While producer sends messages to Kafka it can require different levels of consistency:
  • 0 – producer doesn’t wait for confirmation
  • 1 – wait for acknowledge from leader
  • ALL – wait for acknowledge from all ISR ~ message commit

Apache Kafka is quite flexible in configuration and as such it can meet many different requirements in terms of throughput, consistency and scalability. Replication of topic partition brings read scalability on consumer side but also poses some risk as it is some additional level of complexity to achieve this. If you are unaware of those corner cases it might lead to nasty surprises especially for new comers. So let’s take a closer look at following scenario.

We have topic partition wit a replication factor 2. Producer requires highest consistency level, set to ack = all. Replica 1 is currently leader. Message 10 is committed hence available to clients. Message 11 is not acknowledged nor committed due to the failure of replica 3. Replica 3 will be eliminated from ISR or put offline. That causes that message 11 becomes acknowledged and committed.

kafka_uc1

Next time we loose Replica 2 it is eliminated from ISR and same situation repeats for messages 12 and 13.
kafka_uc2.png
Situation can still be a lot worse, if cluster looses current partition leader – Replica 1 is down now.
kafka_uc3
What happens if Replica 2 or Replica 3 goes back online before Replica 1? One of those becomes a new partition leader and we lost data messages 12 and 13 for sure!
kafka_uc4

Is that a problem? Well the correct answer is: It depends. There are scenarios where this behavior is perfectly fine. Imagine collecting logs from all machines via sending them through Kafka. On the other hand if we implement event sourcing and we just lost some events that we cannot recreate the application state correctly. Yes we have a problem! Unfortunately, if that doesn’t changed in latest releases, that is default configuration if you just install new fresh Kafka cluster. It is a set up which favor availability and throughput over other factors. But Kafka allows you to set it up in a way that it meets your requirements for consistency as well but will sacrifice some availability in order to achieve that (CAP theorem). To avoid described scenario you should use following configuration. Producer should require acknowledge level ALL. Do not allow to kafka perform a new leader election for dirty replicas – use settings unclean.leader.election.enable = false. Use replication factor (default.replication.factor = 3) and require minimal number of replicas to be in sync state to higher than 1 (min.insync.replicas = 2).

We already quickly touched the topic of message delivery to consumer. Kafka doesn’t guarantees that message was delivered to all consumers. It is responsibility of the consumers to read messages. So there is no semantics of persistent message as known from traditional messaging systems. All messages send to Kafka are persistent meaning available for consumption by clients according to retention policy. Retention policy essentially specifies how long the message will be available in Kafka. Currently there are two basic concepts – limited by space used for keeping messages or time for which the message should be at least available. The one which gets violated first wins.

When I need to clean the data from the Kafka (triggered by retention policy) there are two options. The simplest one is just delete the message. Or I can compact messages. Compaction is a process where for each message key is just one message, usually the latest one. That is actually a second semantics of key used in message.

What features you cannot find in Apache Kafka compared to traditional messaging technologies? Probably the most significant is an absence of any selector in combination with listen (wake me on receive). For sure can be implemented via correlation id, but efficiency is on the completely different level. You have to read all messages, deserialize those and filter. Compared to traditional selector which uses custom field in message header where you don’t need even to deserialize message payload that is on completely different level. Monitoring Kafka on production environment essentially concerns elementary question: Are the consumers fast enough? Hence monitoring consumers offsets in respect to retention policy.

Kafka was created in LinkedIn to solve specific problem of modern data driven application to fill the gap in traditional ETL processes usually working with flat files and DB dumps. It is essentially enterprise service bus for data where software components needs exchange data heavily. It unifies and decouples data exchange among components. Typical uses are in “BigData” pipeline together with Hadoop and Spark in lambda or kappa architecture.  It lays down foundations of modern data stream processing.

This post just scratches basic concepts in Apache Kafka. If you are interested in details I really suggest to read following sources which I found quite useful on my way when learning Kafka:

Advertisements

BPMS in production environment

I couldn’t find a better topic than “BPMS (Activos 6.1) in production” to close up the whole series.
Although  ActiveVOS is certainly a cool product there is as usual a space for future improvements. Production environment is something special and as something special it should be treated. If the production environment is down there is simply no business. Empowering the business is the main objective of BPMS, isn’t it? So technology should be ready to cope with that kind of situations. To cut a long story short. Every feature which supports maintainability, reliability, security and sustainability in day-to-day life is highly appreciated.
During the development life-cycle it can be hard (especially in the early stage of the development) to foresee  how the system will be maintained, what the standard procedures looks like, etc. The goal is to mitigate the probability of a process, human or a technical error as low as possible  taking into consideration an ease of problem detection as well.
The following pieces of functionality were found as highly desirable. Some of them are possible to avoid or at least lower the impact  during the design time. For the rest of them some developers’ effort need to be taken into consideration.
  • Different modes of Console – there are no distinct modes neither for development nor production environment. This comes in handy when you need to grant an access to operations for their  day-to-day routine and you don’t wanna let them modify all server settings. For example you just wanna restrict the permission to deploy new processes, start and stop services.
  • Reliable fall over – maybe this question is more on the side of infrastructure. As BPMS fully lives in a DB typical solution consists of cloning a production DB to a backup DB instance. In case of a failure, this instance is started.  If some kind of inconsistency gets into the DB during the crash of a main instance then it is immediately replicated to a backup instance. Does it make sense to start a backup instance?
  • Lack of data archive  procedures – the solution itself doesn’t offer any procedure how to archive completed processes. Because of legal restrictions specific to business domain you are working in you cannot simply delete completed processes.  As your DB grow in size  the response time of BPMS grows as well. You can easily get into trouble with time-out policy. Data growth 200GB per month is feasible. You cannot simply work this problem out by using some advanced features of the underlaying DB like partitioning because you wanna have processes which logically belongs together in one archive. You will be struggling to find out  such a partitioning criteria which could be used in practice and fulfills mentioned requirement.
  • Process upgrade – one of the killer features,  process migration of already running processes to an upgraded version works only in case of small changes of  the process. More over what if your process consumes an external WS which lives completely on its own? What if someone enhance that service and modify that interface? Yea, versioning of the interfaces comes to attention. Having process upgrade feature without versioned interfaces is almost nonsense or at least need a special attention while releasing. Even with versioned interfaces it is not applicable in all situations, eg. sending new data field which presence in the system is not guaranteed.  In large companies this feature is a must. Otherwise it is hard to manage and coordinate all the releases of all connected application.
  • Consider product road map – actually this item belongs to project planning phase where we make decisions about what technology to use. In some environment like banking, insurance etc. there can be legal requirements to have all products from production environment supported by a vendor. If the vendor’s release strategy is a new major version every half a year and support scope is current major version plus two major back than this could pose a problem for a product maintenance team during product life cycle. Migration of all non terminated processes may not be a trivial thing and as such this represents an extra cost.