Skip to content

Node.js Streams

Streams process data piece by piece rather than loading everything into memory at once. Essential for handling large files, HTTP responses, or real-time data.

Without streams, reading a 1GB file loads the entire file into memory before processing. With streams, you process chunks as they arrive — memory usage stays constant.

Without streams: read 1GB → hold in memory → process → write
With streams: read chunk → process chunk → write chunk → repeat
TypeDescriptionExample
ReadableSource of datafs.createReadStream, http.IncomingMessage
WritableDestination for datafs.createWriteStream, http.ServerResponse
TransformRead + write with transformationzlib.createGzip, crypto.createCipher
DuplexRead + write independentlynet.Socket
import { createReadStream } from 'fs';
const stream = createReadStream('./large-file.csv', { encoding: 'utf-8' });
stream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
stream.on('end', () => {
console.log('File fully read');
});
stream.on('error', (err) => {
console.error('Stream error:', err);
});

.pipe() connects a readable to a writable and handles backpressure automatically:

import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
// Compress a file: read → compress → write
createReadStream('./large.csv')
.pipe(createGzip())
.pipe(createWriteStream('./large.csv.gz'));

pipeline properly handles errors and cleanup:

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
await pipeline(
createReadStream('./large.csv'),
createGzip(),
createWriteStream('./large.csv.gz'),
);

Transform chunks as they flow through:

import { Transform } from 'stream';
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
},
});
await pipeline(
createReadStream('./input.txt'),
upperCaseTransform,
createWriteStream('./output.txt'),
);
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
async function processCSV(filePath: string) {
const fileStream = createReadStream(filePath);
const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let lineCount = 0;
for await (const line of rl) {
lineCount++;
const [name, email, age] = line.split(',');
await saveUser({ name, email, age: Number(age) });
}
console.log(`Processed ${lineCount} lines`);
}

Streams are used implicitly in HTTP:

import http from 'http';
import { createReadStream } from 'fs';
// Stream a file directly to HTTP response
const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
createReadStream('./large.txt').pipe(res);
});

Sometimes you need the entire content:

async function streamToString(readable: Readable): Promise<string> {
const chunks: Buffer[] = [];
for await (const chunk of readable) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return Buffer.concat(chunks).toString('utf-8');
}
import { Readable } from 'stream';
function numberStream(max: number): Readable {
let current = 0;
return new Readable({
objectMode: true,
read() {
if (current <= max) {
this.push(current++);
} else {
this.push(null); // signals end of stream
}
},
});
}
for await (const num of numberStream(5)) {
console.log(num); // 0, 1, 2, 3, 4, 5
}

Backpressure prevents a fast producer from overwhelming a slow consumer. pipe() and pipeline() handle this automatically. When writing manually:

const writable = createWriteStream('./output.txt');
for (const chunk of largeDataSource) {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Wait for drain before writing more
await new Promise(resolve => writable.once('drain', resolve));
}
}
writable.end();