Systems of Record and Derived Data
- Systems of record
A system of record, also known as source of truth, holds the authoritative version of your data. When new data comes in, it is first written here. Each fact is represented exactly once (the representation is typically normalized). If there is any discrepancy between another system and the system of record, then the value in the system of record is the correct one.
- Derived data systems
Data in a derived system is the result of taking some existing data from another system and transforming or processing it in some way. If you lose derived data, you can recreate it from the original source. Denormalized values, indexes, and materialized views also fall into this category.
Derived data is redundant, in the sense that it duplicates existing information. However, it is often essential for getting good performance on read queries. It is commonly denormalized. You can derive several different datasets from a single source, enabling you to look at the data from different “points of view”.
Three Different Systems
- Services (online systems)
A service waits for a request or instruction from a client to arrive. When one is received, the service tries to handle it as quickly as possible and sends a response back. Response time is usually the primary measure of performance of a service, and availability is often very important.
- Batch processing systems (offline systems)
A batch processing system takes a large amount of input data, runs a job to process it, and produces some output data. Jobs often take a while, so there normally isn’t a user waiting for the job to finish. Instead, batch jobs are often scheduled to run periodically (for example, once a day). The primary performance measure of a batch job is usually throughput.
- Stream processing systems (near-real-time systems)
Stream processing is somewhere between online and offline/batch processing. Like a batch processing system, a stream processor consumes inputs and produces outputs. However, a stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems.
MapReduce and Distributed Filesystems
Hadoop Distributed File System (HDFS)
Unix tools use stdin and stdout as input and output, MapReduce jobs read and write files on a distributed filesystem, for example Hadoop Distributed File System (HDFS).
HDFS is based on the shared-nothing principle, in contrast to the shared-disk approach of Network Attached Storage (NAS). HDFS consists of a daemon process running on each machine, exposing a network service that allows other nodes to access files stored on that machine. A central server called the NameNode keeps track of which file blocks are stored on which machine. Thus, HDFS conceptually creates one big filesystem that can use the space on the disks of all machines running the daemon.
In order to tolerate machine and disk failures, file blocks are replicated on multiple machines.
MapReduce Execution
A standard MapReduce contains 4 parts:
- Input Parser → Read and parse the input file from HDFS, e.g. break it up to list of records.
- Mapper → Call the mapper function to extract a key and value from each input record.
- Shuffle → Distribute the key-value pairs by key to machines (pairs with similar keys will be assigned in the same machine).
- Reducer → Aggregate key-value pairs to generate output.
Distributed Execution
MapReduce can parallelize a computation across many machines, without you having to write code to explicitly handle the parallelism.

Each input file is typically hundreds of megabytes in size. The MapReduce scheduler (not shown in the diagram) tries to run each mapper on one of the machines that stores a replica of the input file, provided that machine has enough spare RAM and CPU resources to run the map task. This principle is known as putting the computation near the data: it saves copying the input file over the network, reducing network load and increasing locality.
The reduce side of the computation is also partitioned. While the number of map tasks is determined by the number of input file blocks, the number of reduce tasks is configured by the engineer.
In reality, the mapper and reducer don’t have the application code, so the application code needs two callback functions as mapper and reducer, then send their code to running machines, and wait for the output (or a signal to notify application to get final output).
Storage
The key-value pairs must be sorted, but the dataset is likely too large to be sorted with a conventional sorting algorithm on a single machine. Instead, the sorting is performed in stages. First, each map task partitions its output by reducer, based on the hash of the key. Each of these partitions is written to a sorted file on the mapper’s local disk via technique similar to SSTables and LSM-Trees.
Whenever a mapper finishes reading its input file and writing its sorted output files, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper. The reducers connect to each of the mappers and download the files of sorted key-value pairs for their partition. The process of partitioning by reducer, sorting, and copying data partitions from mappers to reducers is known as the shuffle.
For every reducer, it will iterate all mappers, and only download what they need. Because they download files rather than records, so mapper needs to put all related/similar records into one or close files, that’s why mapper needs SSTables and LSM-Trees technique.
The reduce task takes the files from the mappers and merges them together, preserving the sort order. Thus, if different mappers produced records with the same key, they will be adjacent in the merged reducer input.
The reducer can use arbitrary logic to process these records, and can generate any number of output records. These output records are written to a file on the distributed filesystem (usually, one copy on the local disk of the machine running the reducer, with replicas on other machines).
Workflows
It is very common for MapReduce jobs to be chained together into workflows, such that the output of one job becomes the input to the next job.
The Hadoop MapReduce framework does not have any particular support for workflows, so this chaining is done implicitly by directory name: the first job must be configured to write its output to a designated directory in HDFS, and the second job must be configured to use that same directory name for reading its input. From the MapReduce framework’s point of view, they are two independent jobs.
1 | def MapReduceJob(input_file, output_file): |
A batch job’s output is only considered valid when the job has completed successfully (MapReduce discards the partial output of a failed job). Therefore, one job in a workflow can only start when the prior jobs—that is, the jobs that produce its input directories—have completed successfully.
Reduce-Side Joins and Grouping
For batch processing, we always discuss bulk data processing, which means we use full table scan instead of index scan.

Imagine we have a task that needs to analyze the top 10 popular websites for every age stages. We have one records describing the things that logged-in users did on a website. And another records for user informations. We need to do JOIN to get the relations between user activities and their ages.
In order to achieve good throughput in a batch process, the computation must be (as much as possible) local to one machine. Making random-access requests over the network for every record you want to process is too slow. Moreover, querying a remote database would mean that the batch job becomes nondeterministic, because the data in the remote database might change while the job is running.
Thus, a better approach would be to take a copy of the user database (for example, extracted from a database backup using an ETL process) and to put it in the same distributed filesystem as the log of user activity events. You would then have the user database in one set of files in HDFS and the user activity records in another set of files, and you could use MapReduce to bring together all of the relevant records in the same place and process them efficiently.
Sort-Merge JOIN
The logic for sort-merge algorithm is: make every parts sorted, then merge them.
In leetcode, you may see problem that input are several unordered lists, and the output should be one sorted list containing all numbers from input lists. If we sort every input lists first then do merge, it’s sort-merge algorithm :).
1
2 input: [[9, 1, 0, 3], [0, 8, 5, 2]]
output: [0, 0, 1, 2, 3, 5, 8, 9]

- One set of mappers would go over the activity events (extracting the user ID as the key and the activity event as the value).
- One set of mappers would go over the user database (extracting the user ID as the key and the user’s date of birth as the value).
When the MapReduce framework partitions the mapper output by key and then sorts the key-value pairs, the effect is that all the activity events and the user record with the same user ID become adjacent to each other in the reducer input.
The reducer can then perform the actual join logic easily: the reducer function is called once for every user ID, it can get the date-of-birth record from the user database. The reducer stores the date of birth in a local variable and then iterates over the activity events with the same user ID.
For MapReduce, we can use mapper to generate different dicts for different properties with the same key (e.g.
{user_id: age}and{user_id: [activity]}). Then we JOIN these properties via the the same key (age JOIN activities USING user_id).
Handling skew/unbalance
The pattern of “bringing all records with the same key to the same place” breaks down if there is a very large amount of data related to a single key. Such disproportionately active database records are known as hot keys. Since a MapReduce job is only complete when all of its mappers and reducers have completed, any subsequent jobs must wait for the slowest reducer to complete before they can start.
Skewed Join → Sample data first to determine which keys are hot, spreads the work of handling the hot key over several reducers. In shuffle phase, the hot keys will be assgined to one of these reducers randomly. For the other input to the join, records relating to the hot key need to be replicated to all reducers handling that key.
Map-Side Joins
The reduce-side approach has the advantage that you do not need to make any assumptions about the input data: whatever its properties and structure, the mappers can prepare the data to be ready for joining. However, the downside is that all that sorting, copying to reducers, and merging of reducer inputs can be quite expensive.
Map-side join uses a cutdown MapReduce job in which there are no reducers and no sorting. Instead, each mapper simply reads one input file block from the distributed filesystem and writes one output file to the filesystem—that is all.
Broadcast hash JOIN
The simplest way of performing a map-side join applies in the case where a large dataset is joined with a small dataset. In particular, the small dataset needs to be small enough that it can be loaded entirely into memory in each of the mappers.
For the task we described before, let’s assume the user information (user_id with their age) is small enough. In this case, when a mapper starts up, it can first read the user database from the distributed filesystem into an in-memory hash table. Once this is done, the mapper can scan over the user activity events and simply look up the user ID for each event in the hash table.
For another very large input file, we can have several map tasks, every task just contain small part of the large input file. That is called broadcast hash join.
The word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input (so the small input is effectively “broadcast” to all partitions of the large input), and the word hash reflects its use of a hash table.
Partitioned hash JOIN
This approach only works if both of the join’s inputs have the same number of partitions, with records assigned to partitions based on the same key and the same hash function. For example, the hash function uses last digit of user_id, so there are 10 partitions (0~9) for both user age and user activities.
That’s why it’s also known as bucketed map joins → partition join’s datasets into buckets, then only join buckets with the same bucket id. It requires for both datasets, they have the same partition key and the same number of buckets (which implies have the same hash function).
If the partitioning is done correctly, you can be sure that all the records you might want to join are located in the same numbered partition, and so it is sufficient for each mapper to only read one partition from each of the input datasets. This has the advantage that each mapper can load a smaller amount of data into its hash table.
For example, mapper i only needs to load and join age and activities for user_id ends with number i.
Merge JOIN
If the input datasets are not only partitioned in the same way, but also sorted based on the same key, it does not matter whether the inputs are small enough to fit in memory, because a mapper can perform the same merging operation that would normally be done by a reducer.
Map-side Vs Reduce-side Joins
- The output of a reduce-side join is partitioned and sorted by the join key.
- The output of a map-side join is partitioned and sorted in the same way as the large input.
Because the map-side join needs some assumptions about the input data (e.g. sort, size, partition, etc), so it always be used in the middle or rear of the chain, which means the input data is actually some reducer’s output.
Hadoop Vs Distributed Database
The biggest difference is that distributed databases focus on parallel execution of SQL queries on a cluster of machines, while the combination of MapReduce and a distributed filesystem provides something much more like a general-purpose operating system that can run arbitrary programs.
Diversity of storage
Databases require you to structure data according to a particular model, whereas files in a distributed filesystem are just byte sequences.
In practice, it appears that simply making data available quickly—even if it is in a quirky, difficult-to-use, raw format—is often more valuable than trying to decide on the ideal data model up front. This idea is similar to data warehouse—dump data from various sources locally then do joins or aggregations.
Hadoop has often been used for implementing ETL processes:
- Data from transaction processing systems is dumped into the distributed filesystem in some raw form.
- MapReduce jobs are written to clean up that data, transform it into a relational form, and import it into an MPP data warehouse for analytic purposes.
Data modeling still happens, but it is in a separate step, decoupled from the data collection. This decoupling is possible because a distributed filesystem supports data encoded in any format.
Diversity of processing models
MPP databases are monolithic, tightly integrated pieces of software that take care of storage layout on disk, query planning, scheduling, and execution. Moreover, the SQL is also deliberately designed for performance, e.g. predication pushdown, indexing, etc.
On the other hand, not all kinds of processing can be sensibly expressed as SQL queries. These kinds of processing are often very specific to a particular application, so they inevitably require writing code (mapper & reducer), not just queries.
Designing for frequent faults
If a node crashes while a query is executing, most MPP databases abort the entire query, and either let the user resubmit the query or automatically run it again. MPP databases also prefer to keep as much data as possible in memory to avoid the cost of reading from disk.
On the other hand, MapReduce can tolerate the failure of a map or reduce task without it affecting the job as a whole by retrying work at the granularity of an individual task. It is also very eager to write data to disk, partly for fault tolerance, and partly on the assumption that the dataset will be too big to fit in memory anyway.
The reason for why MapReduce is designed to tolerate frequent unexpected task termination: it’s not because the hardware is particularly unreliable, it’s because the freedom to arbitrarily terminate processes enables better resource utilization in a computing cluster.
Beyond MapReduce
Materialization of Intermediate State
As discussed previously, every MapReduce job is independent from every other job. The main contact points of a job with the rest of the world are its input and output directories on the distributed filesystem. If you want the output of one job to become the input to a second job, you need to configure the second job’s input directory to be the same as the first job’s output directory, and an external workflow scheduler must start the second job only once the first job has completed.
In many cases, you know that the output of one job is only ever used as input to one other job, the files on the distributed filesystem are simply intermediate state: a means of passing data from one job to the next. The process of writing out this intermediate state to files is called materialization.
Downsides
- A MapReduce job can only start when all tasks in the preceding jobs have completed. Skew or varying load on different machines means that a job often has a few straggler tasks that take much longer to complete than the others. Having to wait until all of the preceding job’s tasks have completed slows down the execution of the workflow as a whole.
- Mappers are often redundant: they just read back the same file that was just written by a reducer, and prepare it for the next stage of partitioning and sorting.
- Storing intermediate state in a distributed filesystem means those files are replicated across several nodes.
Solutions - Execution Engines
In order to fix these problems with MapReduce, several new execution engines for distributed batch computations were developed, the most well known of which are Spark and Flink. There are various differences in the way they are designed, but they have one thing in common: they handle an entire workflow as one job, rather than breaking it up into independent subjobs.
Raw MapReduce is a chain of independent modules, the tunnel between modules is file (reducer write result to a file, which read by next mapper). Execution Engine manages this chain (these modules) to make it efficient.
In these execution engines, the mapper and reducer are called operators, and the execution engine provides several different options for connecting one operator’s output to another’s input (arrange the operators in a job as a directed acyclic graph (DAG)).
It offers several advantages compared to the MapReduce model:
- Expensive work such as sorting need only be performed in places where it is actually required, rather than always happening by default between every map and reduce stage.
- There are no unnecessary map tasks, since the work done by a mapper can often be incorporated into the preceding reduce operator.
- Because all joins and data dependencies in a workflow are explicitly declared, the scheduler has an overview of what data is required where, so it can make locality optimizations.
- It is usually sufficient for intermediate state between operators to be kept in memory or written to local disk, which requires less I/O than writing it to HDFS (where it must be replicated to several machines and written to disk on each replica).
- Operators can start executing as soon as their input is ready; there is no need to wait for the entire preceding stage to finish before the next one starts.
Fault Tolerance
An advantage of fully materializing intermediate state to a distributed filesystem is that it is durable, which makes fault tolerance fairly easy in MapReduce: if a task fails, it can just be restarted on another machine and read the same input again from the filesystem.
But materizalizing intermediate state is too expensive, so some take a different approach: if a machine fails and the intermediate state on that machine is lost, it is recomputed from other data that is still available (a prior intermediary stage if possible, or otherwise the original input data, which is normally on HDFS). To enable this recomputation, the framework must keep track of how a given piece of data was computed—which input partitions it used, and which operators were applied to it (e.g. Resilient Distributed Dataset (RDD) in Spark).
When recomputing data, it is important to know whether the computation is deterministic: that is, given the same input data, do the operators always produce the same output? This question matters if some of the lost data has already been sent to downstream operators. The solution in the case of nondeterministic operators is normally to kill the downstream operators as well, and run them again on the new data.
High-level APIs
Raw mapper and reducer function in MapReduce is not human-friendly, so some high-level APIs occur to solve these problems.
These dataflow APIs generally use relational-style building blocks to express a computation: joining datasets on the value of some field; grouping tuples by key; filtering by some condition; and aggregating tuples by counting, summing, or other functions. Internally, these operations are implemented using the various join and grouping algorithms that we discussed earlier in this chapter.
These high-level interfaces not only make the humans using the system more productive, but they also improve the job execution efficiency at a machine level.
The move toward declarative query languages
MapReduce and its dataflow successors are very different from the fully declarative query model of SQL. MapReduce was built around the idea of function callbacks: which means programmer can assign any functions, packages, modules and classes to mapper and reducer. They have clear instructions about how to run the code step-by-step.
- declarative language → what should be done, like SQL, only give the purpose;
- imperative language → how to do, like Python, C++, give the step-by-step code.
However, declarative query language has its own advantages: the framework can analyze the properties of the inputs and commands, and automatically decide what is the best algorithm to optimize the processing.
For example, in SQL JOIN, execution engine can apply “predication pushdown”, or iter the smaller table first then iter the bigger one.