A MapReduce architecture is useful in situations with massive datasets where user queries involve large proportions of the datasets. The insight behind MapReduce is that if a 100 kilobyte program needs to process 500 gigabytes of data, then it is more efficient to send the program to the server containing data than to have the data sent to the program.
MapReduce architectures divide processing into two phases:
-
Map: processing that occurs locally on the data
-
Reduce: a global aggregation of the results of the map operations
Problem
Some applications involve extraordinary amounts of data:
-
They have large files (e.g., video, high-resolution images)
-
They involve massive volumes (e.g., sensor readings, stock market transactions)
-
They serve millions of users (e.g., global social networks)
-
They combine several of the above
Since no single server can hold enough disk capacity for such applications, it is necessary to combine many servers into a large storage cluster.
Query processing will be time-consuming if each query requires the transmission of all the data in a cluster. For example, suppose we are in a company with a large cluster storing 500 terabytes of video files. If the marketing department asks, “what’s the total duration of all videos?” then a worst-case scenario would involve sending 500 terabytes of data to be processed. Amazon Web Services would charge almost $50,000 for the data transfer alone. It would take more than a month on a gigabit connection to complete the transfer.
Solution
Rather than sending large amounts of data, it may be easier to transmit code. For example, a small script can be sent to each server in the storage cluster to calculate the total length of the stored video. Rather than sending all the video data, each server needs only send a single number: the total duration of video stored locally. The results are combined by addition, to produce a grand total across the entire storage cluster.
MapReduce is an effective way to model this problem:
- Map
-
Each site performs a local transformation of the data. For example, the system ‘maps’ the input videos into their lengths.
- Reduce
-
The MapReduce cluster aggregates the mapped results to produce a final value. For example, the system ‘reduces’ the video lengths into a final value by adding them together.
Origin of MapReduce
The name MapReduce comes from the functional programming terms map and reduce. In functional programming, a map applies a transformation function to each element of a container (such as an array). A reduce or fold takes a list or tree structure and recursively applies an aggregation function to parts of the structure, gradually building a final result.
For example, given the list [2,5,10,50]
, applying map with the transformation x => x * x
will create a new list with each element multiplied by itself:
$ node
Welcome to Node.js
Type ".help" for more information.
> let items = [2,3,8,9]
undefined
> items.map(x => x * x)
[ 4, 9, 64, 81 ]
>
On an array, reduce requires an initial value and aggregation function that accepts an accumulator and a current value. Starting with the initial value, the aggregation function is applied repeatedly to update the accumulator with each new value. For example, the aggregation (accumulator, currentValue) => accumulator + currentValue
will calculate the total of a list, while (accumulator, currentValue) => accumulator * 10 + currentValue
will build a decimal number from individual digits.
$ node
Welcome to Node.js
Type ".help" for more information.
> let items = [2,3,8,9]
undefined
> items.reduce((acc, curr) => acc + curr, 0)
22
> items.reduce((acc, curr) => acc * 10 + curr, 0)
2389
>
You can combine both map and reduce in one step:
$ node
Welcome to Node.js
Type ".help" for more information.
> let items = [2,3,8,9]
undefined
> items.map(x => x * x).reduce((acc, curr) => acc + curr, 0)
158
>
On Node.js Arrays, map and reduce are only performed on a single computer. A MapReduce architecture expands these concepts to a cluster.
Map reduce in a cluster
The transformation and aggregation functions execute across the cluster. A high-level framework manages the cluster and is responsible for the following tasks:
-
Monitoring active servers in the cluster
-
Distributing data across separate servers in the cluster
-
Sending code between servers
-
Running map jobs on each server
-
Transmitting results between servers
-
Aggregating results across servers, using reducers
-
Returning the final value to the original querying server
Implementation
Apache Hadoop and Apache Spark are well known open-source frameworks for MapReduce and similar distributed computing tasks. The frameworks principally support development in Java and Scala. However, they can run any script or executable using Hadoop Streaming or piping.
MongoDB has built-in MapReduce support. The following query performs a MapReduce over a db.videos
collection in MongoDB (in fact, it computes totals, grouped by video category):
db.videos.mapReduce( // Map function () { // Some computation goes here let result = ... getLengthOf(this.video) ...; // Output the transformation (i.e., for the map) emit(this.category, result); }, // Reduce function (key, values) { // Add each of the lengths returned by the map return Array.sum(values); }, // Options: set the output collection { out:"categoryTotals" } )
MongoDB creates a new collection (db.categoryTotals
) to store the results of the MapReduce operation.
You can use MapReduce principles in normal Node.js development. A complex problem, such as totaling lengths of videos stored across many servers, can be decomposed into a map and reduce phase.
A domain layer (or business process layer) can invoke a transformation (mapping) function on each server, and then reduce the results to produce a total.
Assuming that each site computes the total length via the /processVideos
endpoint, then a simplified form of a domain or business process layer code is listed below:
const express = require('express');
const app = express();
const port = 3000;
const fetch = require('node-fetch');
// The list of servers in the cluster
let servers = [
'server1',
'server2',
'server3'
];
// Helper to perform GET /processVideos on a server in the cluster
async function processRemoteVideos(server) {
let result = await fetch(`http://${server}/processVideos`);
let json = await result.json();
return json.duration;
}
// Compute the total duration of videos in the cluster
app.get('/total', async (req, res) => {
// Start all requests simultaneously
// and wait until they all resolve
let result = await Promise.all(
servers.map(server => processRemoteVideos(server))
);
// Aggregate each of the individual results into a total
let total = result.reduce((acc, curr) => acc + curr, 0);
// Return the total
res.json({ total });
});
app.listen(port, () => console.log(`The API server is running on http://localhost:${port}/total`));
This code lacks timeout or error detection but illustrates the basic idea of distributing an identical workload across multiple servers.