mridul's blog

WTFlink: Stream Processing Fundamentals

Author: @MridulDhiman11

Index:

What is Stream Processing?

Typically, an application stores all the information in the database, which gets stored inside disk, and get retrieved in later point of time... Using stream processing, we can process real time events. It can also be called as realtime event processing.

Here, we have 2 systems, producer and consumer. Producer produces an event, and consumer consumes that event. Actually, producer can send event to consumer, once event is generated, and it can be consumed in realtime by the consumer. Let's assume a scenario where we have multiple producers and consumers with M:N cardinality. In that scenario, we need to maintain approx. O(n^2) no. of active connections, which can put unnecessarily load on the producer and consumer systems.

Here, the message broker comes into picture. It acts as an intermediary between producer and consumer, and forwards all the events from producer to consumer, and consumer can process those events from the message broker. Additionally, it handles and maintains all the connections, so that producers and consumers have to just deal with sending/receiving events effectively. The integration of this 3rd system, reduces the no. of connections by O(n), which was O(n^2) earlier.

Typical examples of message broker are Kafka, RabbitMQ, AWS SQS etc. about which we will discuss in detail.

Use cases of Message Broker

There are 3 main use cases of message broker:

  1. Timing Windows
  2. Change Data Capture
  3. Event Sourcing

Timing Windows

Usage: in realtime logs processing or sensors.

Let's introduce some new jargon here: Tumbling, Hopping and Sliding Windows.

Tumbling Window: It is basically, the minimal possible timing window of particular fixed time interval. Let's say, time interval is 1 minute, so we would have to create a new tumbling window, after every minute. Every event coming at this time interval, will be saved inside this window.

Hopping Window: It is a window for range of time interval e.g. 12:00 to 12:05, implies the hopping window of size 5. You can also think of it like, Hopping Window is made up of 5 tumbling windows. So, all the tumbling windows can be unionned, to combine all the events present in their corresponding tumbling windows, and give the aggregated result. E.g. We can get all the events within the range of starting and ending time slot using hopping windows.

Sliding Windows: So, let's say, what if we want to only see the latest 10 logs, how can we go about and do that. It can be done via sliding windows. They can be implemented either in the form of linked list or queue. So, if new event get enqueued making the size of queue greater than 10, then the front element can be popped. Similarly, in case of linked list, we can maintain head and tail pointer in it, to perform operations in O(1) constant time.

Change Data Capture

Let's understand it with an example. Let's consider our application storing instances of it's state in multiple data stores like database, cache (e.g., redis), search index (e.g., elasticsearch). However, we know that database will be having latest state of our application. So, we need some sort of mechanism, to perform the writes to the corresponding passive data store, having derived data from the database as well (considering database as an active data store, or single source of truth of the application). Stream processing can help in doing that. So, whenever there are certain changes in the database, message broker will capture those changes, and send those changes as events to the desired consumer, which could be a search index, for example, so, if a new post is added to the database, we should be able to search it effectively, via search index (which is actually an inverted index, will write about this in future, so stay tuned 😊) or cache.

Event Sourcing

Problem with Change Data Capture is that, the events which gets generated by the database are database specific events. So, let's say in future, we want to migrate to diff. database, events implementation will also vary, which could lead to problems. So, what we need now are the database agnostic events. Here, what we do is rather than directly writing to database first and then realtime updating the derived data's state with database's latest value, we write to message broker first, then propagate those changes in the form of database agnostic events. Only problem here is, we cannot guarantee the durability of events, as they may get lost, depending upon the implementation of message broker.

Message Processing in Message Broker

  1. at least once.
  2. only once.

At least once

Fault tolerance and some sort of disk persistence using write ahead log.

Only once

There are 2 ways to implement only once processing of messages from message broker.

  1. 2 phase commit: Here, we need a coordinator service, which will continuously poll message broker and consumer, to check if broker is ready to delete and if consumer is ready to consume the event. Once, both systems are ready, it initiates commit on both sides, and event get processed by the consumer, ensuring that it will not be processed again in the future. Only con it has is, it is very slow.
  2. Idempotency: it states that the event will only be processed once, and after that, it will be ignored. Consumer can maintain in-memory mapping or use redis to check whether particular event id has been processed or not. But, if we have more than one consumers, we need to ensure that event will be processed every time by the same consumer, otherwise it would lead to more than once processing of an event.

Types of Message Brokers:

Now that, we have discussed about the message broker and their usage in detail, let's discuss about the different types of message broker and how they differ in their implementations:

  1. In-memory Message Broker: In this type of message broker, all the events are stored in memory, and asynchronously get flushed to the write ahead log (i.e. inside the disk, to ensure durability). Generally, we have a singly linked list based implementation, where the consumer processes the head event from the list, and it temporarily gets deleted, so that other events can also be processed. Once, the consumer sends the acknowledgement that it has successfully processed the event, the event is fully deleted from the memory.

Advantages:

  1. As events are stored in memory, reads and writes becomes very fast, thus very high throughput.

Disadvantages:

  1. Events can get lost, as they are stored in the memory. We can still have some sort of persistence by flushing all those events to disk asynchronously (to write ahead log actually).
  2. We cannot replay those events again, if one of the consumer dies, as they get deleted after the successful acknowledgement from the consumer itself.
  3. Events delivery can be out of order.

Examples: SQS, RabbitMQ, AmazonMQ.

  1. Log based message broker: as we discussed earlier in the memory based memory based message brokers, the events can get lost, and that is something we might not want in the certain scenarios. Plus, if one of the consumers dies, we also want all the events to replayed back again. For these purposes , we have log based message brokers. All the events are written sequentially to the disk, and the checkpoints are maintained for all the consumers i.e. upto which event it has processed in the log and we can start processing from there only.

Advantages:

  1. Events are persistent.
  2. Events can be replayed back again.
  3. Events are processed in order, as they are written sequentially to the disk.

Disadvantages:

  1. Comparatively slow, and thus less throughput.

Examples: Kafka, AWS Kinesis

So, the conclusion is that in-memory message brokers can be used when there is we want write throughput delivery of events and order of events does not matter like we sending notifications to clients, or we transcoding videos in the background. In those cases, order of messages being processed does not matter. In Contrast, Log based message brokers can be used in the case of Change Data Capture. In Change Data Capture, we are propagating our writes to all the derived data like cache, search index etc. We want the writes to be in the same order, they were written. E.g. first we create a new post, and then delete it. We can't have delete event first before creating the post itself. That's why, order of the events plays an essential role. Additionally, all the writes from DB, needs to be persistent in the disk. Which is only possible in the case of log based message brokers.

1. Apache Kafka Streams

3. Apache Spark Streaming

4. Apache Storm

5. Apache Samza

6. Google Cloud Dataflow

7. Amazon Kinesis Data Analytics

Comparison and Use Cases

Each framework has its strengths and is suited to different use cases. The choice often depends on factors such as existing infrastructure, required processing guarantees, latency requirements, and team expertise.