Intro
For batch processing, the key is that the data is bounded, i.e. of a known and finite size so the batch process knows when it has finished reading inputs. In reality, a lot of data is unbounded because it arrives gradually over time, you have no idea about when to stop reading. Thus, batch processors must artificially divide the data into chunks of fixed duration. For stream processor, the data is incrementally made available over time.
Transmitting Event Streams
In batch processing, the inputs and outputs are files (chunks of data). In stream processing, the inputs are events.
Actually event is just a small, self-contained, immutable object containing the details of something that happended at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock.
In batch processing, a file is written once and then potentially read by multiple jobs. And a filename indentifies a set of related records.
In streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple consumers (subscribers or recipients). And related events are grouped together into a topic.
Messaging Systems
The most significant change for stream processing system is: the consumers only consume topics or events when they are notified.
A common approach for notifying consumers about new events is to use a messaging system: a producer sends a message containing the event, which is then pushed to consumers, via communication channels like TCP. But the problem is: TCP is one-to-one, while stream processing needs one-to-many or many-to-many. We call it publish/subscribe model.
Problems
- What happens if the producers send messages faster than the consumers can process them?
- drop messages
- buffer messages in a queue → does the system crash if the queue no longer fits in memory, or does it write messages to disk?
- backpressure (flow control)
- What happens if nodes crash or temporarily go offline—are any messages lost?
A nice property of the batch processing systems is that batch processing systems provide a strong reliability guarantee: failed tasks are automatically retried, and partial output from failed tasks is automatically discarded. This means the output is the same as if no failures had occurred, which helps simplify the programming model.
Direct messaging from producers to consumers
A number of messaging systems use direct network communication between producers and consumers without going via intermediary nodes. It’s pretty simple and straight forward.
However, these methods generally require the application code to be aware of the possibility of message loss. And the faults they can tolerate are quite limited: even if the protocols detect and retransmit packets that are lost in the network, they generally assume that producers and consumers are constantly online.
Message brokers / Message queue
It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.
By centralizing the data in the broker, these systems can more easily tolerate clients that come and go (connect, disconnect, and crash), and the question of durability is moved to the broker instead (broker can be configured how to tolerate message loss, application codes can ignore message loss).
Asynchronous → when a producer sends a message, it normally only waits for the broker to confirm that it has buffered the message and does not wait for the message to be processed by consumers. The delivery to consumers will happen at some undetermined future point in time.
Compared to databases
- Databases usually keep data until it is explicitly deleted, whereas most message brokers automatically delete a message when it has been successfully delivered to its consumers.
- Most message brokers assume that their working set is fairly small—i.e., the queues are short.
- Databases often support secondary indexes and various ways of searching for data, while message brokers often support some way of subscribing to a subset of topics matching some pattern.
- When querying a database, the result is typically based on a point-in-time snapshot of the data. By contrast, message brokers do not support arbitrary queries, but they do notify clients when data changes.
Multiple consumers

- Load balancing → multiple consumers share the broker
- Fan-out → each message is delivered to all of the consumers
And these two patterns can be combined: for example, two separate groups of consumers may each subscribe to a topic, such that each group collectively receives all messages, but within each group only one of the nodes receives each message.
Acknowledgments and redelivery
Consumers may crash at any time, so it could happen that a broker delivers a message to a consumer but the consumer never processes it, or only partially processes it before crashing. In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.

Reordering is a problem when consider load balancing + redelivery. For example, if m3 crashed in consumer2, then we redeliver m3 to consumer1. The final order becomes [m2, m4, m3, m5] rather than [m2, m3, m4, m5].
Log-based Messaging
Difference between database with traditional broker
Everything that is written to a database is normally expected to be recorded in logs first (WAL). However, for message brokers that durably write messages to disk will quickly delete them again after they have been delivered to consumers, because they are built around a transient messaging mindset.
If a new consumer is added to a messaging system, it can only read messages that sent after the time it was registered, while in database system, it can retrive historical records.
This difference has a big impact on how derived data is created. A key feature of batch processes, is that you can run them repeatedly. This is not the case with messaging: receiving a message is destructive if the acknowledgment causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.
Solution
log-based message broker is a hybrid, combining the durable storage approach of databases with the low-latency notification facilities of messaging. Because it has durable log (historical data), we can also do some operations like batch processing, for example, get all messages from yesterday.
A log is simply an append-only sequence of records on disk.
A producer sends a message by appending it to the end of the log, and a consumer receives messages by reading the log sequentially. If a consumer reaches the end of the log, it waits for a notification that a new message has been appended.
For scalability, logs can be partioned. A topic can then be defined as a group of partitions that all carry messages of the same type.

Within each partition, the broker assigns a monotonically increasing sequence number, or offset, to every message. Such a sequence number makes sense because a partition is append-only, so the messages within a partition are totally ordered. There is no ordering guarantee across different partitions.
Since partitioned logs typically preserve message ordering only within a single partition, all messages that need to be ordered consistently need to be routed to the same partition.
Consumer offsets
Consuming a partition sequentially makes it easy to tell which messages have been processed: all messages with an offset less than a consumer’s current offset have already been processed, and all messages with a greater offset have not yet been seen. Thus, the broker doesn’t need to wait for all acknowledgements.
For example, if broker stores message #1 to #10, and it receives ack for #8, which means #1-#8 already been consumed, it can delete message #1 to #8, and wait for ack number that greater than 8.
If a consumer fails, another idle node can be assigned to consume this partition, and starts at the last recorded offset. When the failed consumer recovers, it continues to consume according to the offset.
This is similar to log sequence number in single-leader replication. It allows a follower to reconnect to a leader after it has become disconnected, and resume replication without skipping any writes.
When consumers cannot keep up with producers
The log is actually divided into segments, and from time to time old segments are deleted or moved to archive storage (common implementation includes ring buffer). If a slow consumer cannot keep up with the rate of messages, and it falls so far behind that its consumer offset points to a deleted segment, it will miss some of the messages.
You can monitor how far a consumer is behind, and raise an alert to ask some human operations.
The good news is: the slow consumer only affects its own partition, it doesn’t disrupt other consumers. When a consumer is shut down or crashes, it stops consuming resources—the only thing that remains is its consumer offset.
Databases and Streams
Log-based message broker uses some database logics in stream system. On the other hand, database can also use some logics of steam system.
Keeping Systems in Sync
If periodic full database dumps are too slow (for example, dump whole data to build index and cache), an alternative that is sometimes used is dual writes, in which the application code explicitly writes to each of the systems when data changes.
1 | db.run_sql("SQL COMMAND") |
Problems @ Dual Writes
- Concurrency → in the database, X is first set to A and then to B, while at the search index the writes arrive in the opposite order.

- Fault-tolerance → in the database, building index succeeds, but buding cache fails.
Change Data Capture
The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API. Clients are supposed to query the database through its data model and query language, not parse the replication logs and try to extract data from them.
Change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems. CDC is especially interesting if changes are made available as a stream, immediately as they are written.

“Writing data in database” is the event, and “log in disk” is the message. The search index and any other derived data systems are just consumers of the change stream (log change as message broker).
Change data capture is a mechanism for ensuring that all changes made to the system of record are also reflected in the derived data systems so that the derived systems have an accurate copy of the data.
System of record is the groud truth, derived data system is another representation of the groud truth. CDC means: if the groud truth has change, then map this change to other representations.
Implementing CDC
ssentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database to the derived systems, since it preserves the ordering of messages.
CDC sounds like TRIGGER in DB, but TRIGGER is hard to use and has significant performance overheads, because TRIGGER will focus on data itself. While CDC mainly analyzes logs rather than data.
Snapshot and Log compaction
For database replication, it has log sequence number to record the snapshot, if the database is crushed, then it will recover from the snapshot rather than from the very begining. It’s the same for CDC, it will only copy the changes after the snapshot to derived systems.
For example, if the system of record system has the following records:
1
2
3
4
5 99 set A = 0
100 set A = 1 <- snapshot / LSN in database
101 set A = 2
102 set B = A
103 set B = B + 1Then CDC can just use record number greater than 100 to build derived data.
For key-value structured system, it can compact its log by using new value to overwrite old value for the same key. For CDC, the log can only keeps the newest values, and when build derived systems, they can just use these newest values.
For example, if the system of record system has the following records:
1
2
3
4
5 99 set A = 0
100 set A = 1
101 set A = 2
102 set B = A
103 set B = B + 1Then CDC can just keep compacted logs to build derived data.
1
2 set A = 2
set B = 3
Processing Streams
Streams can work as pipe: a stream processor consumes input streams in a read-only fashion and writes its output to a different location in an append-only fashion.
The partitioning, parallelization, mapping operations like transforming and filtering are the same as MapReduce (batch processing).
The one crucial difference to batch jobs is that a stream never ends:
- Sort doesn’t make sense → unbound dataset cannot be sorted
- Fault tolerance mechanisms changes → batch processing can be restarted, but stream processing is endless
Uses of Stream Processing
- Complex event processing (CEP) → It is an approach developed for analyzing event streams, especially geared toward the kind of application that requires searching for certain event patterns, for example, to find invalid operations in a long-term transaction.
- Stream analytics → Usually be used to do aggregations and statistical metrics over a large number of events, for example, calculate statistics in a live TV show.
- Maintaining materialized views → A stream of changes to a database can be used to keep derived data systems, such as caches, search indexes, and data warehouses, up to date with a source database. Use stream processing can maintain these materialized views for efficient calculations and queries.
Reasoning About Time
For batch processing, timing is not a huge problem. Because batch processing only processes historical data, so the requirements are always like “the average between 01/01/2023 and 12/31/2023”. There is no point in looking at the system clock of the machine running the batch process, it only use the timestamp in the record / dataset.
However, for stream processing, the requirements are like “the average over the last 30 seconds”, it touches the log timestamp and the system clock of running machine, where the lag and drift may occur.
Event time Vs Processing time
Processing may be delayed by queuing, network failing, etc, and can also lead to unpredictable ordering of messages.
Confusing event time and processing time leads to bad data. For example, say you have a stream processor that measures the rate of requests (counting the number of requests per second). If you redeploy the stream processor, it may be shut down for a minute and process the backlog of events when it comes back up. If you measure the rate based on the processing time, it will look as if there was a sudden anomalous spike of requests while processing the backlog, when in fact the real rate of requests was steady.

Knowing when you’re ready
A tricky problem when defining windows in terms of event time is that you can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.
For example, say you’re grouping events into one-minute windows so that you can count the number of requests per minute. You can time out and declare a window ready after you have not seen any new events for that window in a while. However, it could still happen that some events were buffered on another machine somewhere, delayed due to a network interruption. You need to be able to handle such straggler events that arrive after the window has already been declared complete.
- Ignore the straggler events, as they are probably a small percentage of events in normal circumstances. You can track the number of dropped events as a metric, and alert if you start dropping a significant amount of data.
- Publish a correction, an updated value for the window with stragglers included. You may also need to retract the previous output.
Whose clock are you using?
Imagine the event can be buffer in mobile phone, PC, server and other devices. Different devices have different clocks, which may cause drift or skew. Whose clock are correct?
To adjust for incorrect device clocks, one approach is to log three timestamps:
- The time at which the event occurred, according to the device clock
- The time at which the event was sent to the server, according to the device clock
- The time at which the event was received by the server, according to the server clock
By subtracting the second timestamp from the third, you can estimate the offset between the device clock and the server clock. You can then apply that offset to the event timestamp, and thus estimate the true time at which the event actually occurred.
Types of windows
Once you know how the timestamp of an event should be determined, the next step is to decide how windows over time periods should be defined. The window can then be used for aggregations, for example to count events, or to calculate the average of values within the window. Several types of windows are in common use:
- Tumbling window
A tumbling window has a fixed length, and every event belongs to exactly one window. For example, if you have a 1-minute tumbling window, all the events with timestamps between 10:03:00 and 10:03:59 are grouped into one window, events between 10:04:00 and 10:04:59 into the next window, and so on. You could implement a 1-minute tumbling window by taking each event timestamp and rounding it down to the nearest minute to determine the window that it belongs to.
- Hopping window
A hopping window also has a fixed length, but allows windows to overlap in order to provide some smoothing. For example, a 5-minute window with a hop size of 1 minute would contain the events between 10:03:00 and 10:07:59, then the next window would cover events between 10:04:00 and 10:08:59, and so on. You can implement this hopping window by first calculating 1-minute tumbling windows, and then aggregating over several adjacent windows.
- Sliding window
A sliding window contains all the events that occur within some interval of each other. For example, a 5-minute sliding window would cover events at 10:03:39 and 10:08:12, because they are less than 5 minutes apart (note that tumbling and hopping 5-minute windows would not have put these two events in the same window, as they use fixed boundaries). A sliding window can be implemented by keeping a buffer of events sorted by time and removing old events when they expire from the window.
- Session window
Unlike the other window types, a session window has no fixed duration. Instead, it is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time (for example, if there have been no events for 30 minutes). Sessionization is a common requirement for website analytics.
Stream JOINs
New events can appear anytime on a stream makes joins on streams more challenging than in batch jobs.
Stream-stream JOIN (window JOIN)
For example, in a website, we have two streams:
- when user search a keyword, record an event that contains the keyword and its results.
- when user click one URL, record an event.
In order to calculate the click-through rate for each URL in the search results, you need to bring together the events for the search action and the click action, which are connected by having the same session ID.
Why not just record the URL when we click? Why JOIN? Because the click may never come if the user abandons their search, and even if it comes, the time between the search and the click may be highly variable.
For example, the user searched 10 times, but only clicked 5 times. If we just record the URL when user clicks, we only have information like “the user searched and clicked 5 times”, while if we do recording then JOIN, we will have “the user searched 10 but clicked 5, 5 still missing”.
Note that record the details of the search in the click event is not equivalent to joining the events: doing so would only tell you about the cases where the user clicked a search result, not about the searches where the user did not click any of the results. In order to measure search quality, you need accurate click-through rates, for which you need both the search events and the click events.
To implement this type of join, a stream processor needs to maintain state: for example, all the events that occurred in the last hour, indexed by session ID. Whenever a search event or click event occurs, it is added to the appropriate index, and the stream processor also checks the other index to see if another event for the same session ID has already arrived. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.
Stream-table JOIN (stream enrichment)
For example, one user continously change his profile, e.g. modifying his username, password, address and payment method. It’s a stream. And all of his information is in a database table. This JOIN just use the stream events to modify table values.
Table-table JOIN (materialized view maintenance)
Just like JOIN in database, the goal is to get up-to-date materialized view after stream events modifying the database tables.
Fault Tolerance
If a task in batch processing fails, it can simply be started again on another machine, and the output of the failed task is discarded.
It’s harder in stream processing because the task is endless.
Microbatching and checkpointing
Microbatching
Break the stream into small blocks, and treat each block like a miniature batch process. This is used in Spark Streaming. The batch size is typically around one second, which is the result of a performance compromise: smaller batches incur greater scheduling and coordination overhead, while larger batches mean a longer delay before results of the stream processor become visible.
Microbatching also implicitly provides a tumbling window equal to the batch size (windowed by processing time, not event timestamps); any jobs that require larger windows need to explicitly carry over state from one microbatch to the next.
Checkpoint
In Apache Flink, is to periodically generate rolling checkpoints of state and write them to durable storage. If a stream operator crashes, it can restart from its most recent checkpoint and discard any output generated between the last checkpoint and the crash. The checkpoints are triggered by barriers in the message stream, similar to the boundaries between microbatches, but without forcing a particular window size.
Disadvantage
Within the confines of the stream processing framework, the microbatching and checkpointing approaches provide the same exactly-once semantics as batch processing. However, as soon as output leaves the stream processor (for example, by writing to a database, sending messages to an external message broker, or sending emails), the framework is no longer able to discard the output of a failed batch. In this case, restarting a failed task causes the external side effect to happen twice, and microbatching or checkpointing alone is not sufficient to prevent this problem.
Atomic commit revisited
In order to give the appearance of exactly-once processing in the presence of faults, we need to ensure that all outputs and side effects of processing an event take effect if and only if the processing is successful.
Those things either all need to happen atomically, or none of them must happen, but they should not go out of sync with each other. These implementations do not attempt to provide transactions across heterogeneous technologies, but instead keep the transactions internal by managing both state changes and messaging within the stream processing framework. The overhead of the transaction protocol can be amortized by processing several input messages within a single transaction.
Idempotence (依赖幂等性)
An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once.
For example,
SET X = 1is idempotent, whileX++is not.
Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata.
For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.
Rebuilding state after a failure
Any stream process that requires state—for example, any windowed aggregations (such as counters, averages, and histograms) and any tables and indexes used for joins—must ensure that this state can be recovered after a failure.
Keep the state in a remote datastore and replicate it, although having to query a remote database for each individual message can be slow.
Keep state local to the stream processor, and replicate it periodically. Then, when the stream processor is recovering from a failure, the new task can read the replicated state and resume processing without data loss.