From 48fcc26a89d31b6ffd3eb2e1019f1a6266b60109 Mon Sep 17 00:00:00 2001 From: Forceres Date: Wed, 1 Apr 2026 17:13:35 +0400 Subject: [PATCH 1/2] feat(microservices): add return buffers option for binary data --- packages/microservices/client/client-redis.ts | 31 ++++++- .../test/client/client-redis.spec.ts | 82 +++++++++++++++++++ 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/packages/microservices/client/client-redis.ts b/packages/microservices/client/client-redis.ts index 0296508b280..e7c4b56bac9 100644 --- a/packages/microservices/client/client-redis.ts +++ b/packages/microservices/client/client-redis.ts @@ -16,6 +16,10 @@ import { ClientProxy } from './client-proxy'; // type Redis = import('ioredis').Redis; type Redis = any; +type RedisOutputOptions = { + returnBuffers?: boolean; +}; + let redisPackage = {} as any; /** @@ -34,7 +38,10 @@ export class ClientRedis extends ClientProxy { callback: RedisEvents[keyof RedisEvents]; }> = []; - constructor(protected readonly options: Required['options']) { + constructor( + protected readonly options: Required['options'] & + RedisOutputOptions, + ) { super(); redisPackage = loadPackage('ioredis', ClientRedis.name, () => @@ -139,7 +146,10 @@ export class ClientRedis extends ClientProxy { if (!this.wasInitialConnectionSuccessful) { this.wasInitialConnectionSuccessful = true; - this.subClient.on('message', this.createResponseCallback()); + this.subClient.on( + this.options.returnBuffers ? 'messageBuffer' : 'message', + this.createResponseCallback(), + ); } }); } @@ -223,12 +233,27 @@ export class ClientRedis extends ClientProxy { buffer: string, ) => Promise { return async (channel: string, buffer: string) => { - const packet = JSON.parse(buffer); + let packet: any; + try { + packet = JSON.parse(buffer); + } catch (err) { + this.logger.debug( + 'Redis response packet is not in json format, bypassing...', + ); + packet = buffer; + } const { err, response, isDisposed, id } = await this.deserializer.deserialize(packet); const callback = this.routingMap.get(id); if (!callback) { + if (Buffer.isBuffer(buffer)) + this.logger.debug( + 'You have to parse your buffer on your own to get id from it, because it is not in json format', + ); + this.logger.debug( + 'No matching callback found for Redis response packet with id: ' + id, + ); return; } if (isDisposed || err) { diff --git a/packages/microservices/test/client/client-redis.spec.ts b/packages/microservices/test/client/client-redis.spec.ts index 99d92ded5c2..de12e857a98 100644 --- a/packages/microservices/test/client/client-redis.spec.ts +++ b/packages/microservices/test/client/client-redis.spec.ts @@ -179,6 +179,32 @@ describe('ClientRedis', () => { expect(callback.called).to.be.false; }); }); + describe('custom binary format (not json)', () => { + it('should use buffer directly without parsing it as json', async () => { + const clientWithBuffers = new ClientRedis({ returnBuffers: true }); + const callback = sinon.spy(); + const str = `${responseMessage.id}|${responseMessage.response}`; + const bufferMessage = Buffer.from(str); + sinon + .stub(Reflect.get(clientWithBuffers, 'deserializer'), 'deserialize') + .resolves({ + ...responseMessage, + response: bufferMessage, + }); + const subscription = clientWithBuffers.createResponseCallback(); + + clientWithBuffers['routingMap'].set(responseMessage.id, callback); + await subscription('channel', bufferMessage as any); + + expect(callback.called).to.be.true; + expect( + callback.calledWith({ + err: undefined, + response: bufferMessage, + }), + ).to.be.true; + }); + }); }); describe('close', () => { const untypedClient = client as any; @@ -303,6 +329,62 @@ describe('ClientRedis', () => { client.registerReadyListener(emitter as any); expect(callback.getCall(0).args[0]).to.be.eql(RedisEventsMap.READY); }); + it('should register "message" event when returnBuffers is not set', () => { + const onSpy = sinon.spy(); + const client = new ClientRedis({}); + const untypedClient = client as any; + const emitter = { + on: onSpy, + }; + + untypedClient.wasInitialConnectionSuccessful = false; + untypedClient.subClient = emitter; + + client.registerReadyListener(emitter as any); + const readyHandler = onSpy.getCall(0).args[1]; + readyHandler(); + + expect(onSpy.calledTwice).to.be.true; + expect(onSpy.getCall(1).args[0]).to.equal('message'); + }); + it('should register "message" event when returnBuffers is false', () => { + const onSpy = sinon.spy(); + const client = new ClientRedis({ returnBuffers: false }); + const untypedClient = client as any; + + const emitter = { + on: onSpy, + }; + + untypedClient.wasInitialConnectionSuccessful = false; + untypedClient.subClient = emitter; + + client.registerReadyListener(emitter as any); + const readyHandler = onSpy.getCall(0).args[1]; + readyHandler(); + + expect(onSpy.calledTwice).to.be.true; + expect(onSpy.getCall(1).args[0]).to.equal('message'); + }); + it('should register "messageBuffer" event when returnBuffers is true', () => { + const onSpy = sinon.spy(); + const clientWithBuffers = new ClientRedis({ returnBuffers: true }); + const untypedClientWithBuffers = clientWithBuffers as any; + + const emitter = { + on: onSpy, + }; + + untypedClientWithBuffers.wasInitialConnectionSuccessful = false; + untypedClientWithBuffers.subClient = emitter; + + clientWithBuffers.registerReadyListener(emitter as any); + const readyHandler = onSpy.getCall(0).args[1]; + readyHandler(); + + expect(onSpy.calledTwice).to.be.true; + expect(onSpy.getCall(1).args[0]).to.equal('messageBuffer'); + }); }); describe('registerReconnectListener', () => { it('should bind reconnect event handler', () => { From d3c80854330dc320e4bb41a0a9801c04011932d1 Mon Sep 17 00:00:00 2001 From: Forceres Date: Fri, 3 Apr 2026 23:02:19 +0400 Subject: [PATCH 2/2] test(microservices): add integration tests for return buffers option --- .../microservices/e2e/binary-redis.spec.ts | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 integration/microservices/e2e/binary-redis.spec.ts diff --git a/integration/microservices/e2e/binary-redis.spec.ts b/integration/microservices/e2e/binary-redis.spec.ts new file mode 100644 index 00000000000..558acd16247 --- /dev/null +++ b/integration/microservices/e2e/binary-redis.spec.ts @@ -0,0 +1,116 @@ +import { INestApplication } from '@nestjs/common'; +import { + ClientRedis, + ClientsModule, + Deserializer, + IncomingResponse, + Transport, +} from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; +import { lastValueFrom } from 'rxjs'; +import Redis from 'ioredis'; + +class BinaryDeserializer implements Deserializer { + deserialize(value: Buffer): IncomingResponse { + const firstSeparatorIndex = value.indexOf(':'); + const secondSeparatorIndex = value.indexOf(':', firstSeparatorIndex + 1); + + return { + id: value.subarray(0, firstSeparatorIndex).toString(), + isDisposed: true, + err: null, + response: value.subarray(secondSeparatorIndex + 1), + }; + } +} + +describe('REDIS transport', () => { + let app: INestApplication; + let client: ClientRedis; + let pub: Redis; + let sub: Redis; + + beforeEach(async () => { + const module = await Test.createTestingModule({ + imports: [ + ClientsModule.register({ + clients: [ + { + name: 'REDIS_SERVICE', + transport: Transport.REDIS, + options: { + returnBuffers: true, + host: '0.0.0.0', + port: 6379, + deserializer: new BinaryDeserializer(), + }, + }, + ], + }), + ], + }).compile(); + + app = module.createNestApplication(); + + pub = new Redis(); + sub = new Redis(); + + await sub.subscribe('binary', (_, __) => {}); + + sub.on('message', async (channel, message) => { + const data = JSON.parse(message); + const delay = data.data === 'slow' ? 25 : 0; + const responseBody = + data.data === 'bytes' + ? Buffer.from([0, 1, 2, 3, 255]) + : Buffer.from(`${data.data}-replied`); + const response = Buffer.concat([ + Buffer.from(`${data.id}:${channel}:`), + responseBody, + ]); + + if (delay > 0) { + await new Promise(resolve => setTimeout(resolve, delay)); + } + + await pub.publish(`${channel}.reply`, response); + }); + + client = app.get('REDIS_SERVICE'); + + await client.connect(); + + await app.init(); + }); + + it('should return a raw binary payload', async () => { + const data = await lastValueFrom(client.send('binary', 'data')); + expect(Buffer.isBuffer(data)).to.be.true; + expect(data).to.deep.equal(Buffer.from('data-replied')); + }); + + it('should route concurrent raw binary replies to the matching request', async () => { + const [slowResponse, fastResponse] = await Promise.all([ + lastValueFrom(client.send('binary', 'slow')), + lastValueFrom(client.send('binary', 'fast')), + ]); + expect(Buffer.isBuffer(slowResponse)).to.be.true; + expect(Buffer.isBuffer(fastResponse)).to.be.true; + expect(slowResponse).to.deep.equal(Buffer.from('slow-replied')); + expect(fastResponse).to.deep.equal(Buffer.from('fast-replied')); + }); + + it('should preserve non-utf8 bytes in the raw binary payload', async () => { + const data = await lastValueFrom(client.send('binary', 'bytes')); + expect(Buffer.isBuffer(data)).to.be.true; + expect(data).to.deep.equal(Buffer.from([0, 1, 2, 3, 255])); + }); + + afterEach(async () => { + await app.close(); + await client.close(); + await pub.quit(); + await sub.quit(); + }); +});