Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ const { pipeline, compose } = require('node:stream')
const fp = require('fastify-plugin')
const encodingNegotiator = require('@fastify/accept-negotiator')
const mimedb = require('mime-db')
const peek = require('peek-stream')
const { Minipass } = require('minipass')
const { Readable } = require('readable-stream')

const { isStream, isGzip, isDeflate, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable } = require('./lib/utils')
const { isStream, isGzip, isDeflate, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable, createLazyTransform } = require('./lib/utils')

const InvalidRequestEncodingError = createError('FST_CP_ERR_INVALID_CONTENT_ENCODING', 'Unsupported Content-Encoding: %s', 415)
const InvalidRequestCompressedPayloadError = createError('FST_CP_ERR_INVALID_CONTENT', 'Could not decompress the request payload using the provided encoding', 400)
Expand Down Expand Up @@ -557,26 +556,26 @@ function maybeUnzip (payload, serialize) {
}

function zipStream (deflate, encoding) {
return peek({ newline: false, maxBuffer: 10 }, function (data, swap) {
return createLazyTransform(function (data) {
switch (isCompressed(data)) {
case 1: return swap(null, new Minipass())
case 2: return swap(null, new Minipass())
case 1: return new Minipass()
case 2: return new Minipass()
}
return swap(null, deflate[encoding]())
return deflate[encoding]()
})
}

function unzipStream (inflate, maxRecursion) {
if (!(maxRecursion >= 0)) maxRecursion = 3
return peek({ newline: false, maxBuffer: 10 }, function (data, swap) {
return createLazyTransform(function (data) {
// This path is never taken, when `maxRecursion` < 0 it is automatically set back to 3
/* c8 ignore next */
if (maxRecursion < 0) return swap(new Error('Maximum recursion reached'))
if (maxRecursion < 0) throw new Error('Maximum recursion reached')
switch (isCompressed(data)) {
case 1: return swap(null, compose(inflate.gzip(), unzipStream(inflate, maxRecursion - 1)))
case 2: return swap(null, compose(inflate.deflate(), unzipStream(inflate, maxRecursion - 1)))
case 1: return compose(inflate.gzip(), unzipStream(inflate, maxRecursion - 1))
case 2: return compose(inflate.deflate(), unzipStream(inflate, maxRecursion - 1))
}
return swap(null, new Minipass())
return new Minipass()
})
}

Expand Down
61 changes: 59 additions & 2 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { Readable: NodeReadable } = require('node:stream')
const { Readable: NodeReadable, Transform } = require('node:stream')

// https://datatracker.ietf.org/doc/html/rfc8878#section-3.1.1
function isZstd (buffer) {
Expand Down Expand Up @@ -104,4 +104,61 @@ async function * intoAsyncIterator (payload) {
yield payload
}

module.exports = { isZstd, isGzip, isDeflate, isStream, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable }
/**
* Creates a Transform that lazily selects an inner transform by peeking
* at the first `maxBuffer` bytes.
*
* @param {(data: Buffer) => import('stream').Transform} choose
* Called once with the peeked bytes. Must return the transform stream
* that all data (including the peeked bytes) will be piped through.
* @param {number} [maxBuffer=10] - Number of bytes to peek before deciding.
* @returns {import('stream').Transform}
*/
function createLazyTransform (choose, maxBuffer = 10) {
let chunks = []
let chunksLength = 0
let dest = null

return new Transform({
transform (chunk, encoding, cb) {
if (dest) {
return dest.write(chunk, encoding, cb)
}

chunks.push(chunk)
chunksLength += chunk.length

if (chunksLength < maxBuffer) return cb()

const buf = Buffer.concat(chunks)
chunks = null

dest = choose(buf.subarray(0, maxBuffer))
dest.on('data', (d) => this.push(d))
dest.on('end', () => this.push(null))
dest.write(buf, cb)
},

flush (cb) {
if (!dest) {
const buf = Buffer.concat(chunks)
chunks = null

dest = choose(buf)
dest.on('data', (d) => this.push(d))
dest.on('end', () => { this.push(null); cb() })
Comment thread
hardik-kaji marked this conversation as resolved.
if (buf.length > 0) {
dest.end(buf)
} else {
dest.end()
}
return
}

dest.on('end', cb)
dest.end()
}
})
}

module.exports = { isZstd, isGzip, isDeflate, isStream, intoAsyncIterator, isWebReadableStream, isFetchResponse, webStreamToNodeReadable, createLazyTransform }
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"fastify-plugin": "^5.0.0",
"mime-db": "^1.52.0",
"minipass": "^7.0.4",
"peek-stream": "^1.1.3",
"readable-stream": "^4.5.2"
},
"devDependencies": {
Expand Down
123 changes: 122 additions & 1 deletion test/utils.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
const { createReadStream } = require('node:fs')
const { Socket } = require('node:net')
const { Duplex, PassThrough, Readable, Stream, Transform, Writable } = require('node:stream')
const { pipeline } = require('node:stream/promises')
const { test } = require('node:test')
const { isStream, isZstd, isDeflate, isGzip, intoAsyncIterator } = require('../lib/utils')
const { isStream, isZstd, isDeflate, isGzip, intoAsyncIterator, createLazyTransform } = require('../lib/utils')

test('isStream() utility should be able to detect Streams', async (t) => {
t.plan(12)
Expand Down Expand Up @@ -124,3 +125,123 @@ test('intoAsyncIterator() utility should handle different data', async (t) => {
equal(chunk, obj)
}
})

test('createLazyTransform() should peek at first N bytes and pipe through selected stream', async (t) => {
const input = Buffer.from('hello world')
const chunks = []

const peekStream = createLazyTransform((data) => {
t.assert.equal(data.length, 5)
t.assert.equal(data.toString(), 'hello')
return new PassThrough()
}, 5)

peekStream.on('data', (chunk) => chunks.push(chunk))

await new Promise((resolve, reject) => {
peekStream.on('end', resolve)
peekStream.on('error', reject)
peekStream.write(input)
peekStream.end()
})

t.assert.equal(Buffer.concat(chunks).toString(), 'hello world')
})

test('createLazyTransform() should work when data arrives in small chunks', async (t) => {
const chunks = []

const peekStream = createLazyTransform((data) => {
t.assert.equal(data.length, 4)
t.assert.equal(data.toString(), 'abcd')
return new PassThrough()
}, 4)

peekStream.on('data', (chunk) => chunks.push(chunk))

await new Promise((resolve, reject) => {
peekStream.on('end', resolve)
peekStream.on('error', reject)
peekStream.write(Buffer.from('ab'))
peekStream.write(Buffer.from('cd'))
peekStream.write(Buffer.from('ef'))
peekStream.end()
})

t.assert.equal(Buffer.concat(chunks).toString(), 'abcdef')
})

test('createLazyTransform() should handle stream ending before maxBuffer is reached', async (t) => {
const chunks = []

const peekStream = createLazyTransform((data) => {
t.assert.equal(data.toString(), 'hi')
return new PassThrough()
})

peekStream.on('data', (chunk) => chunks.push(chunk))

await new Promise((resolve, reject) => {
peekStream.on('end', resolve)
peekStream.on('error', reject)
peekStream.write(Buffer.from('hi'))
peekStream.end()
})

t.assert.equal(Buffer.concat(chunks).toString(), 'hi')
})

test('createLazyTransform() should propagate errors from choose function', async (t) => {
const peekStream = createLazyTransform(() => {
throw new Error('test error')
}, 2)

await t.assert.rejects(
pipeline(Readable.from(Buffer.from('hello')), peekStream, new PassThrough()),
{ message: 'test error' }
)
})

test('createLazyTransform() should handle empty stream', async (t) => {
const chunks = []

const peekStream = createLazyTransform((data) => {
t.assert.equal(data.length, 0)
return new PassThrough()
})

peekStream.on('data', (chunk) => chunks.push(chunk))

await new Promise((resolve, reject) => {
peekStream.on('end', resolve)
peekStream.on('error', reject)
peekStream.end()
})

t.assert.equal(Buffer.concat(chunks).length, 0)
})

test('createLazyTransform() should work with a transform stream as destination', async (t) => {
const chunks = []

const upper = new Transform({
transform (chunk, enc, cb) {
cb(null, chunk.toString().toUpperCase())
}
})

const peekStream = createLazyTransform(() => {
return upper
}, 3)

peekStream.on('data', (chunk) => chunks.push(chunk))

await new Promise((resolve, reject) => {
peekStream.on('end', resolve)
peekStream.on('error', reject)
peekStream.write(Buffer.from('hello'))
peekStream.end()
})

t.assert.equal(Buffer.concat(chunks).toString(), 'HELLO')
})