In computing, streams and pipelines refer to sequential chains of data processing stages.
Problem
Often data needs to go through multiple stages of processing.
For example, when you upload a video to YouTube, it might follow the following processing steps: [1]
-
Check the video is a valid file
-
Split the video into smaller ‘chunks’ for processing
-
Compress chunks for streaming to mobile devices
-
Detect unlicensed music use
-
Detect unlicensed video use
-
Detect inappropriate content
-
Get video statistics (e.g., length, resolution, quality)
-
Generate thumbnails
Similarly, when you upload a post to Facebook, it may also have a sequence of validation steps: [2]
-
Detect if it contains abusive speech (e.g., hate speech)
-
Check any embedded links for viruses or malware
-
Detect faces in attached images
-
Match faces to known friends
-
Resize and compress images for mobile users
It is easy to imagine that these chains will keep growing in complexity. YouTube and Facebook have thousands of developers. Perhaps one developer will add a feature to detect whether the video or post is child-friendly. Another might want to warn users if they’ve accidentally included passwords or other confidential information. Yet another developer might want to add color adjustments for white-balance so that images or video captured indoors do not look yellow. Finally, another developer might want to gather logging or statistical data.
Some problems can arise in managing all of these stages:
-
Developers must carefully trace through code to see how the processing is connected.
-
When adding new steps, it can be difficult to identify the existing steps.
-
If something fails in a later stage, it may be necessary to reverse or redo earlier stages.
-
If the steps are invoked one-after-the-other, the entire series of steps may take a long time to be processed.
Solution
The idea behind pipelines or streams is to treat these stages like a configurable assembly line. Each stage performs a small amount of work. A pipeline architecture connects the stages and manages the flow of data from one stage to the next.
The architectural goal in a pipeline architecture is to break up processing into simple, small, self-contained units of work.
For example, ordinary video processing code might have one complex function that splits a video file into chunks and encodes those chunks (all at once). In stream processing, there are two simple stages: one function to split a video file into chunks, another that encodes the chunks.
Implementation
While the team shopping list does not yet need complex pipelines, creating a new item triggers two actions. Consider the following excerpt from the constructor in src/server/item.js
:
// Notify the team about the new item
notificationService.notifyNewItem(this.description);
// Generate an icon
this.icon = iconService.generateIcon(this.description);
A basic pipeline architecture uses a consistent structure for each stage. In the example below, each stage is a function that takes an item:
const notifyNewItemStage =
item => {
notificationService.notifyNewItem(item.description);
};
const generateIconStage =
item => {
item.icon = iconService.generateIcon(item.description);
};
Then, we can define a helper function to build a pipeline:
function createPipeline(stages) {
// Return a function that will apply each stage to the item
return item => {
for (let stage of stages)
stage(item);
};
}
The pipeline can be configured and used, as follows:
// Build a pipeline from each stage
const newItemPipeline = createPipeline(
[
notifyNewItemStage,
generateIconStage
]
);
class Item {
constructor(description, quantity) {
...
// Start the pipeline on this object
newItemPipeline(this);
}
...
}
When this application becomes more complex, adding stages is as simple as adding lines to the newItemPipeline
definition. [3]
Reactive programming and RxJS
The reactive programming framework RxJS is a popular framework for pipeline and stream programming. This framework is a core part of Angular.
In RxJS, Observables are objects that generate notifications. In the previous section, I noted that RxJS Observables can be used in a publish/subscribe architecture.
RxJS observables can also implement pipelines. The output of an Observable can be modified using Operators. These operators may be chained together in a pipeline to create more complex behaviors. [4]
In Angular templates, Pipes and the pipe (|
) symbol can combine formatting operators to build a pipeline for sophisticated output formatting. Angular has pipes that are ready to use (uppercase
, lowercase
, date
, currency
and json
). Also, the @Pipe
annotation makes it possible to define custom stages in a pipeline.
I/O Streams
Stream processing is a common approach to handling input and output (I/O). Files, keyboard input and network connections are all streams of bytes or characters. Many programming languages provide a way of building more complex streams by chaining simple streams.
For example, a pipeline to send data over a network might consist of five stages: [5]
-
Convert Unicode text into UTF-8 bytes
-
Compress the bytes
-
Encrypt the bytes
-
Buffer the bytes (so data is queued-up while waiting for the network)
-
Send the bytes over the network connection
Node.js provides I/O streams in the built-in stream
module. Consider, as a starting point, the following code to write to a file in Node.js:
const fs = require('fs');
let results = fs.createWriteStream('results.txt');
results.write('Hello, world!',
() => results.end()
);
Suppose the file should be compressed. With streams, this is a very simple change to create a short pipeline:
const fs = require('fs');
const zlib = require('zlib');
const stream = require('stream');
let raw = fs.createWriteStream('results.txt.gz');
let results = zlib.createGzip();
// Chain the Zlib compressed stream into the raw file
results.pipe(raw);
results.write('Hello, compressed world!',
() => results.end()
);
In this example, the results.pipe(raw)
establishes a single step pipeline between the written text and the output file. The invocation of results.write(…)
is unchanged. Node.js streams manage all the details of compressing and forwarding the compressed data to the underlying file.