From harness-claude
Process large data efficiently using Node.js streams (Readable, Writable, Transform) with pipeline for error handling and backpressure.
How this skill is triggered — by the user, by Claude, or both
Slash command
/harness-claude:node-streams-patternThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
> Process large data efficiently using Node.js Readable, Writable, and Transform streams
Process large data efficiently using Node.js Readable, Writable, and Transform streams
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
const readable = createReadStream('large-file.csv', { encoding: 'utf-8' });
for await (const chunk of readable) {
processChunk(chunk);
}
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';
await pipeline(createReadStream('input.json'), createGzip(), createWriteStream('output.json.gz'));
import { Transform } from 'node:stream';
const uppercase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
},
});
await pipeline(createReadStream('input.txt'), uppercase, createWriteStream('output.txt'));
import { Transform } from 'node:stream';
const filterActive = new Transform({
objectMode: true,
transform(user, encoding, callback) {
if (user.isActive) this.push(user);
callback();
},
});
import { Readable } from 'node:stream';
async function* generateRecords() {
for (let i = 0; i < 1_000_000; i++) {
yield { id: i, value: Math.random() };
}
}
const readable = Readable.from(generateRecords());
import { createReadStream } from 'node:fs';
app.get('/download', (req, res) => {
res.setHeader('Content-Type', 'application/octet-stream');
const stream = createReadStream('large-file.zip');
stream.pipe(res);
stream.on('error', (err) => {
res.status(500).end('Error reading file');
});
});
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
const rl = createInterface({
input: createReadStream('data.csv'),
crlfDelay: Infinity,
});
for await (const line of rl) {
const [name, email] = line.split(',');
await processUser({ name, email });
}
pipeline instead of .pipe() — pipeline handles error propagation and cleanup:// Bad: errors not propagated, streams not cleaned up
readable.pipe(transform).pipe(writable);
// Good: errors propagated, streams destroyed on error
await pipeline(readable, transform, writable);
Streams process data in chunks without loading the entire dataset into memory. A 10GB file can be processed with a 64KB buffer.
Stream types:
Backpressure: When a writable stream cannot consume data as fast as the readable produces it, Node.js automatically pauses the readable stream. This prevents memory exhaustion. pipeline handles backpressure correctly; manual .pipe() does not always.
highWaterMark: Controls the internal buffer size (default 16KB for byte streams, 16 objects for object mode). Lower values reduce memory but increase I/O operations.
Trade-offs:
readFilepipeline handles errors and cleanup — but requires all streams to implement destroy correctlyhttps://nodejs.org/api/stream.html
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeNode.js runtime conventions, APIs, and ecosystem patterns. Invoke whenever task involves any interaction with Node.js runtime — server code, CLI tools, scripts, module system, streams, process lifecycle, or package configuration.
Guides Java Streams API usage for functional-style data processing on collections, covering stream creation, filter, map, flatMap, and terminal operations like collect and reduce.
Performs file system operations using fs.promises, path utilities, and file watching. Useful for reading, writing, manipulating files/directories, and watching file changes.