diff --git a/index.js b/index.js index 7c434e5..2eda90c 100644 --- a/index.js +++ b/index.js @@ -7,11 +7,10 @@ const { pipeline, compose } = require('node:stream') const fp = require('fastify-plugin') const encodingNegotiator = require('@fastify/accept-negotiator') const mimedb = require('mime-db') -const peek = require('peek-stream') const { Minipass } = require('minipass') const { Readable } = require('readable-stream') -const { isStream, isGzip, isDeflate, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable } = require('./lib/utils') +const { isStream, isGzip, isDeflate, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable, createLazyTransform } = require('./lib/utils') const InvalidRequestEncodingError = createError('FST_CP_ERR_INVALID_CONTENT_ENCODING', 'Unsupported Content-Encoding: %s', 415) const InvalidRequestCompressedPayloadError = createError('FST_CP_ERR_INVALID_CONTENT', 'Could not decompress the request payload using the provided encoding', 400) @@ -557,26 +556,26 @@ function maybeUnzip (payload, serialize) { } function zipStream (deflate, encoding) { - return peek({ newline: false, maxBuffer: 10 }, function (data, swap) { + return createLazyTransform(function (data) { switch (isCompressed(data)) { - case 1: return swap(null, new Minipass()) - case 2: return swap(null, new Minipass()) + case 1: return new Minipass() + case 2: return new Minipass() } - return swap(null, deflate[encoding]()) + return deflate[encoding]() }) } function unzipStream (inflate, maxRecursion) { if (!(maxRecursion >= 0)) maxRecursion = 3 - return peek({ newline: false, maxBuffer: 10 }, function (data, swap) { + return createLazyTransform(function (data) { // This path is never taken, when `maxRecursion` < 0 it is automatically set back to 3 /* c8 ignore next */ - if (maxRecursion < 0) return swap(new Error('Maximum recursion reached')) + if (maxRecursion < 0) throw new Error('Maximum recursion reached') switch (isCompressed(data)) { - case 1: return swap(null, compose(inflate.gzip(), unzipStream(inflate, maxRecursion - 1))) - case 2: return swap(null, compose(inflate.deflate(), unzipStream(inflate, maxRecursion - 1))) + case 1: return compose(inflate.gzip(), unzipStream(inflate, maxRecursion - 1)) + case 2: return compose(inflate.deflate(), unzipStream(inflate, maxRecursion - 1)) } - return swap(null, new Minipass()) + return new Minipass() }) } diff --git a/lib/utils.js b/lib/utils.js index 70854f4..f21397d 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,6 +1,6 @@ 'use strict' -const { Readable: NodeReadable } = require('node:stream') +const { Readable: NodeReadable, Transform } = require('node:stream') // https://datatracker.ietf.org/doc/html/rfc8878#section-3.1.1 function isZstd (buffer) { @@ -104,4 +104,61 @@ async function * intoAsyncIterator (payload) { yield payload } -module.exports = { isZstd, isGzip, isDeflate, isStream, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable } +/** + * Creates a Transform that lazily selects an inner transform by peeking + * at the first `maxBuffer` bytes. + * + * @param {(data: Buffer) => import('stream').Transform} choose + * Called once with the peeked bytes. Must return the transform stream + * that all data (including the peeked bytes) will be piped through. + * @param {number} [maxBuffer=10] - Number of bytes to peek before deciding. + * @returns {import('stream').Transform} + */ +function createLazyTransform (choose, maxBuffer = 10) { + let chunks = [] + let chunksLength = 0 + let dest = null + + return new Transform({ + transform (chunk, encoding, cb) { + if (dest) { + return dest.write(chunk, encoding, cb) + } + + chunks.push(chunk) + chunksLength += chunk.length + + if (chunksLength < maxBuffer) return cb() + + const buf = Buffer.concat(chunks) + chunks = null + + dest = choose(buf.subarray(0, maxBuffer)) + dest.on('data', (d) => this.push(d)) + dest.on('end', () => this.push(null)) + dest.write(buf, cb) + }, + + flush (cb) { + if (!dest) { + const buf = Buffer.concat(chunks) + chunks = null + + dest = choose(buf) + dest.on('data', (d) => this.push(d)) + dest.on('end', () => { this.push(null); cb() }) + if (buf.length > 0) { + dest.end(buf) + } else { + dest.end() + } + return + } + + dest.on('end', cb) + dest.end() + } + }) +} + +module.exports = { isZstd, isGzip, isDeflate, isStream, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable, createLazyTransform } diff --git a/package.json b/package.json index 83a4d22..1c72e5e 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,6 @@ "fastify-plugin": "^5.0.0", "mime-db": "^1.52.0", "minipass": "^7.0.4", - "peek-stream": "^1.1.3", "readable-stream": "^4.5.2" }, "devDependencies": { diff --git a/test/utils.test.js b/test/utils.test.js index cc71569..bf77a7e 100644 --- a/test/utils.test.js +++ b/test/utils.test.js @@ -3,8 +3,9 @@ const { createReadStream } = require('node:fs') const { Socket } = require('node:net') const { Duplex, PassThrough, Readable, Stream, Transform, Writable } = require('node:stream') +const { pipeline } = require('node:stream/promises') const { test } = require('node:test') -const { isStream, isZstd, isDeflate, isGzip, intoAsyncIterator } = require('../lib/utils') +const { isStream, isZstd, isDeflate, isGzip, intoAsyncIterator, createLazyTransform } = require('../lib/utils') test('isStream() utility should be able to detect Streams', async (t) => { t.plan(12) @@ -124,3 +125,123 @@ test('intoAsyncIterator() utility should handle different data', async (t) => { equal(chunk, obj) } }) + +test('createLazyTransform() should peek at first N bytes and pipe through selected stream', async (t) => { + const input = Buffer.from('hello world') + const chunks = [] + + const peekStream = createLazyTransform((data) => { + t.assert.equal(data.length, 5) + t.assert.equal(data.toString(), 'hello') + return new PassThrough() + }, 5) + + peekStream.on('data', (chunk) => chunks.push(chunk)) + + await new Promise((resolve, reject) => { + peekStream.on('end', resolve) + peekStream.on('error', reject) + peekStream.write(input) + peekStream.end() + }) + + t.assert.equal(Buffer.concat(chunks).toString(), 'hello world') +}) + +test('createLazyTransform() should work when data arrives in small chunks', async (t) => { + const chunks = [] + + const peekStream = createLazyTransform((data) => { + t.assert.equal(data.length, 4) + t.assert.equal(data.toString(), 'abcd') + return new PassThrough() + }, 4) + + peekStream.on('data', (chunk) => chunks.push(chunk)) + + await new Promise((resolve, reject) => { + peekStream.on('end', resolve) + peekStream.on('error', reject) + peekStream.write(Buffer.from('ab')) + peekStream.write(Buffer.from('cd')) + peekStream.write(Buffer.from('ef')) + peekStream.end() + }) + + t.assert.equal(Buffer.concat(chunks).toString(), 'abcdef') +}) + +test('createLazyTransform() should handle stream ending before maxBuffer is reached', async (t) => { + const chunks = [] + + const peekStream = createLazyTransform((data) => { + t.assert.equal(data.toString(), 'hi') + return new PassThrough() + }) + + peekStream.on('data', (chunk) => chunks.push(chunk)) + + await new Promise((resolve, reject) => { + peekStream.on('end', resolve) + peekStream.on('error', reject) + peekStream.write(Buffer.from('hi')) + peekStream.end() + }) + + t.assert.equal(Buffer.concat(chunks).toString(), 'hi') +}) + +test('createLazyTransform() should propagate errors from choose function', async (t) => { + const peekStream = createLazyTransform(() => { + throw new Error('test error') + }, 2) + + await t.assert.rejects( + pipeline(Readable.from(Buffer.from('hello')), peekStream, new PassThrough()), + { message: 'test error' } + ) +}) + +test('createLazyTransform() should handle empty stream', async (t) => { + const chunks = [] + + const peekStream = createLazyTransform((data) => { + t.assert.equal(data.length, 0) + return new PassThrough() + }) + + peekStream.on('data', (chunk) => chunks.push(chunk)) + + await new Promise((resolve, reject) => { + peekStream.on('end', resolve) + peekStream.on('error', reject) + peekStream.end() + }) + + t.assert.equal(Buffer.concat(chunks).length, 0) +}) + +test('createLazyTransform() should work with a transform stream as destination', async (t) => { + const chunks = [] + + const upper = new Transform({ + transform (chunk, enc, cb) { + cb(null, chunk.toString().toUpperCase()) + } + }) + + const peekStream = createLazyTransform(() => { + return upper + }, 3) + + peekStream.on('data', (chunk) => chunks.push(chunk)) + + await new Promise((resolve, reject) => { + peekStream.on('end', resolve) + peekStream.on('error', reject) + peekStream.write(Buffer.from('hello')) + peekStream.end() + }) + + t.assert.equal(Buffer.concat(chunks).toString(), 'HELLO') +})