Node.js Streams
Node.js Streams
Section titled “Node.js Streams”Streams process data piece by piece rather than loading everything into memory at once. Essential for handling large files, HTTP responses, or real-time data.
Why Streams
Section titled “Why Streams”Without streams, reading a 1GB file loads the entire file into memory before processing. With streams, you process chunks as they arrive — memory usage stays constant.
Without streams: read 1GB → hold in memory → process → writeWith streams: read chunk → process chunk → write chunk → repeatStream Types
Section titled “Stream Types”| Type | Description | Example |
|---|---|---|
Readable | Source of data | fs.createReadStream, http.IncomingMessage |
Writable | Destination for data | fs.createWriteStream, http.ServerResponse |
Transform | Read + write with transformation | zlib.createGzip, crypto.createCipher |
Duplex | Read + write independently | net.Socket |
Reading a File as a Stream
Section titled “Reading a File as a Stream”import { createReadStream } from 'fs';
const stream = createReadStream('./large-file.csv', { encoding: 'utf-8' });
stream.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes`);});
stream.on('end', () => { console.log('File fully read');});
stream.on('error', (err) => { console.error('Stream error:', err);});Piping Streams
Section titled “Piping Streams”.pipe() connects a readable to a writable and handles backpressure automatically:
import { createReadStream, createWriteStream } from 'fs';import { createGzip } from 'zlib';
// Compress a file: read → compress → writecreateReadStream('./large.csv') .pipe(createGzip()) .pipe(createWriteStream('./large.csv.gz'));pipeline (Recommended over pipe)
Section titled “pipeline (Recommended over pipe)”pipeline properly handles errors and cleanup:
import { pipeline } from 'stream/promises';import { createReadStream, createWriteStream } from 'fs';import { createGzip } from 'zlib';
await pipeline( createReadStream('./large.csv'), createGzip(), createWriteStream('./large.csv.gz'),);Transform Streams
Section titled “Transform Streams”Transform chunks as they flow through:
import { Transform } from 'stream';
const upperCaseTransform = new Transform({ transform(chunk, encoding, callback) { callback(null, chunk.toString().toUpperCase()); },});
await pipeline( createReadStream('./input.txt'), upperCaseTransform, createWriteStream('./output.txt'),);Reading Streams with async for…of
Section titled “Reading Streams with async for…of”import { createReadStream } from 'fs';import { createInterface } from 'readline';
async function processCSV(filePath: string) { const fileStream = createReadStream(filePath); const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let lineCount = 0; for await (const line of rl) { lineCount++; const [name, email, age] = line.split(','); await saveUser({ name, email, age: Number(age) }); }
console.log(`Processed ${lineCount} lines`);}HTTP with Streams
Section titled “HTTP with Streams”Streams are used implicitly in HTTP:
import http from 'http';import { createReadStream } from 'fs';
// Stream a file directly to HTTP responseconst server = http.createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); createReadStream('./large.txt').pipe(res);});Collecting Stream Data
Section titled “Collecting Stream Data”Sometimes you need the entire content:
async function streamToString(readable: Readable): Promise<string> { const chunks: Buffer[] = []; for await (const chunk of readable) { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); } return Buffer.concat(chunks).toString('utf-8');}Custom Readable Stream
Section titled “Custom Readable Stream”import { Readable } from 'stream';
function numberStream(max: number): Readable { let current = 0; return new Readable({ objectMode: true, read() { if (current <= max) { this.push(current++); } else { this.push(null); // signals end of stream } }, });}
for await (const num of numberStream(5)) { console.log(num); // 0, 1, 2, 3, 4, 5}Backpressure
Section titled “Backpressure”Backpressure prevents a fast producer from overwhelming a slow consumer. pipe() and pipeline() handle this automatically. When writing manually:
const writable = createWriteStream('./output.txt');
for (const chunk of largeDataSource) { const canContinue = writable.write(chunk); if (!canContinue) { // Wait for drain before writing more await new Promise(resolve => writable.once('drain', resolve)); }}
writable.end();