From fb6823df0b5d9701f56beff20146a8f8327c6216 Mon Sep 17 00:00:00 2001 From: Tristan Tarrant Date: Thu, 7 May 2026 11:00:17 +0200 Subject: [PATCH 1/2] [#148] Continuous query support --- .../topics/proc_continuous_queries.adoc | 53 +++++ lib/codec.js | 39 ++- lib/infinispan.js | 52 ++++ lib/listeners.js | 100 +++++++- lib/protocols.js | 23 +- spec/codec_spec.js | 86 +++++++ spec/infinispan_cq_spec.js | 223 ++++++++++++++++++ types/index.d.ts | 28 +++ 8 files changed, 596 insertions(+), 8 deletions(-) create mode 100644 documentation/asciidoc/topics/proc_continuous_queries.adoc create mode 100644 spec/infinispan_cq_spec.js diff --git a/documentation/asciidoc/topics/proc_continuous_queries.adoc b/documentation/asciidoc/topics/proc_continuous_queries.adoc new file mode 100644 index 0000000..bf00a22 --- /dev/null +++ b/documentation/asciidoc/topics/proc_continuous_queries.adoc @@ -0,0 +1,53 @@ +[id='continuous-queries_{context}'] += Continuous queries + +Continuous queries let you receive real-time notifications when cache entries join, leave, or are updated relative to an Ickle query. +Unlike regular listeners, continuous queries filter events on the server side, so your client only receives events for entries that match the query. + +== Registering a continuous query + +Register a continuous query with `addContinuousQuery()`, providing an Ickle query string and optional named parameters: + +[source,javascript,options="nowrap",subs=attributes+] +---- +const cq = await client.addContinuousQuery( + 'FROM tutorial.InstaPost p WHERE p.user = :userName', + {params: {userName: 'belen_esteban'}} +); +---- + +== Handling events + +Subscribe to the three event types on the returned handle: + +[source,javascript,options="nowrap",subs=attributes+] +---- +cq.on('joining', function(key, value, projection) { + console.log('Entry joined the result set'); +}); + +cq.on('leaving', function(key, value, projection) { + console.log('Entry left the result set'); +}); + +cq.on('updated', function(key, value, projection) { + console.log('Matching entry was updated'); +}); +---- + +* **joining** -- fired when a new entry matches the query, or an existing entry is modified to match. +* **leaving** -- fired when an entry no longer matches the query, either because it was removed or modified. +* **updated** -- fired when an entry that already matches the query is modified but still matches. + +== Removing a continuous query + +[source,javascript,options="nowrap",subs=attributes+] +---- +await client.removeContinuousQuery(cq); +---- + +== Requirements + +* The cache must use Protostream (`application/x-protostream`) encoding. +* Protobuf schemas must be registered on the server before registering the continuous query. +* The client must call `registerProtostreamRoot()` and `registerProtostreamType()` for the types used in the query. diff --git a/lib/codec.js b/lib/codec.js index bf5c334..25c7297 100644 --- a/lib/codec.js +++ b/lib/codec.js @@ -821,11 +821,48 @@ protobuf.loadSync(path.join(`${__dirname}/protostream/message-wrapping.proto`),root); //loaded the wrappedMessage.proto to the root var QueryRequest = root.lookupType('org.infinispan.query.remote.client.QueryRequest'); var QueryResponse = root.lookupType('org.infinispan.query.remote.client.QueryResponse'); + var ContinuousQueryResult = root.lookupType('org.infinispan.query.remote.client.ContinuousQueryResult'); return { QueryRequest, - QueryResponse + QueryResponse, + ContinuousQueryResult }; }()); + var CQ_RESULT_TYPES = ['leaving', 'joining', 'updated']; + + /** + * Wrap a scalar value as a WrappedMessage byte array. + * @param {string|number|boolean} value - The value to wrap. + * @returns {Buffer} The encoded WrappedMessage bytes. + */ + exports.wrapScalar = function(value) { + return WrappedMessage.encode(createWrappedMessage(value, _.identity)).finish(); + }; + + /** + * Decode a ContinuousQueryResult from raw protobuf bytes. + * @param {Buffer} bytes - The raw ContinuousQueryResult bytes. + * @returns {Object} Decoded result with resultType, key, value, projection. + */ + exports.decodeContinuousQueryResult = function(bytes) { + var msg = Query.ContinuousQueryResult.decode(bytes); + return { + resultType: CQ_RESULT_TYPES[msg.resultType] || 'leaving', + key: msg.key, + value: msg.value, + projection: msg.projection + }; + }; + + /** + * Decode a WrappedMessage from raw bytes. + * @param {Buffer} bytes - The raw WrappedMessage bytes. + * @returns {Object} The decoded WrappedMessage object. + */ + exports.decodeWrappedMessage = function(bytes) { + return WrappedMessage.decode(bytes); + }; + }.call(this)); diff --git a/lib/infinispan.js b/lib/infinispan.js index 26c34bf..6e1b715 100644 --- a/lib/infinispan.js +++ b/lib/infinispan.js @@ -824,6 +824,58 @@ return false; }); }, + /** + * Continuous query options. + * + * @typedef {Object} ContinuousQueryOptions + * @property {Object} [params] - Named parameter bindings for the Ickle query. + * @since 0.16 + */ + /** + * Register a continuous query that watches for cache changes + * matching the given Ickle query. + * + * @param {String} queryString Ickle query string. + * @param {ContinuousQueryOptions=} opts Optional CQ options. + * @returns {Promise} + * A promise completed with a ContinuousQuery handle. + * Use cq.on('joining', fn), cq.on('leaving', fn), cq.on('updated', fn) + * to receive events. + * @memberof Client# + * @since 0.16 + */ + addContinuousQuery: function(queryString, opts) { + var ctx = transport.context(SMALL); + logger.debugf('Invoke addContinuousQuery(msgId=%d,query=%s)', ctx.id, queryString); + return listen.addContinuousQueryListener(transport, ctx, queryString, opts); + }, + /** + * Remove a continuous query. + * + * @param {Object} cq ContinuousQuery handle returned by addContinuousQuery. + * @returns {Promise} + * A promise completed when the continuous query has been removed. + * @memberof Client# + * @since 0.16 + */ + removeContinuousQuery: function(cq) { + var listenerId = cq.getListenerId(); + var ctx = transport.context(SMALL); + logger.debugf('Invoke removeContinuousQuery(msgId=%d,listenerId=%s)', ctx.id, listenerId); + var conn = listen.findConnectionListener(listenerId); + if (!f.existy(conn)) + return Promise.reject( + new Error(`No server connection for CQ listener (listenerId=${ listenerId })`)); + + var remote = futurePinned(ctx, 0x27, p.encodeListenerId(listenerId), p.complete(p.hasSuccess), conn); + return remote.then(function (success) { + if (success) { + listen.removeListeners(listenerId); + return true; + } + return false; + }); + }, /** * Add script to server(s). * diff --git a/lib/listeners.js b/lib/listeners.js index 3184d4d..3b09a5b 100644 --- a/lib/listeners.js +++ b/lib/listeners.js @@ -10,6 +10,8 @@ var events = require('events'); + var CQ_FACTORY = 'continuous-query-filter-converter-factory'; + module.exports = listeners; /** @@ -132,8 +134,10 @@ dispatchEvent: function(event, listenerId, bytebuf, emitFunc) { return function() { var l = listeners.get(listenerId); - if (f.existy(l)) - return emitFunc(event, l.emitter, bytebuf, listenerId); + if (f.existy(l)) { + var emit = l.cqEmit || emitFunc; + return emit(event, l.emitter, bytebuf, listenerId); + } logger.error('No emitter exists for listener %s', listenerId); return true; @@ -152,7 +156,99 @@ setProtocol: function(newProtocol) { protocol = newProtocol; }, + /** + * Register a continuous query listener. + * @param {Object} transport Transport instance. + * @param {Object} ctx Request context. + * @param {string} queryString Ickle query string. + * @param {Object} [opts] Options with optional params map. + * @returns {Promise} ContinuousQuery handle. + */ + addContinuousQueryListener: function(transport, ctx, queryString, opts) { + opts = opts || {}; + var listenerId = _.uniqueId('cq_'); + logger.debugl(function() { + return ['Invoke addContinuousQueryListener(msgId=%d,query=%s,listenerId=%s)', + ctx.id, queryString, listenerId]; }); + + var cqParams = buildCQParams(queryString, opts.params); + + var listenerOpts = { + includeState: true, + useRawData: true, + filterFactory: { name: CQ_FACTORY, params: cqParams }, + converterFactory: { name: CQ_FACTORY, params: cqParams } + }; + + var encodeListenerAddCommon = protocol.encodeListenerAdd(listenerId, listenerOpts)(); + var encodeListenerAddInterests = protocol.encodeListenerInterests(listenerOpts); + var encodeListenerAdd = function() { + return f.cat(encodeListenerAddCommon, encodeListenerAddInterests); + }; + + var cqEmitFn = function(event, emitter, bytebuf) { + var payloadLength = codec.decodeVariableBytes()(bytebuf); + if (!f.existy(payloadLength) || !f.existy(payloadLength.answer)) + return false; + var payload = payloadLength.answer; + var wrapped = codec.decodeWrappedMessage(payload); + if (!f.existy(wrapped) || !f.existy(wrapped.wrappedMessage)) + return true; + var cqResult = codec.decodeContinuousQueryResult(wrapped.wrappedMessage); + emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection); + return true; + }; + + var preWrite = function(conn) { + var emitter = new events.EventEmitter(); + listeners.put(listenerId, { + id: listenerId, + emitter: emitter, + conn: conn, + cqEmit: cqEmitFn + }); + }; + + var remote = futurePreWrite(transport, ctx, 0x25 + , encodeListenerAdd, protocol.complete(protocol.hasSuccess) + , listenerOpts, preWrite); + + return remote + .then(function(success) { + if (success) { + var l = listeners.get(listenerId); + var emitter = l.emitter; + return { + on: function(event, callback) { emitter.on(event, callback); return this; }, + getListenerId: function() { return listenerId; } + }; + } + listen.removeListeners(listenerId); + return undefined; + }) + .catch(function() { + listen.removeListeners(listenerId); + }); + }, }; + + /** + * Build CQ factory params from query and named params. + * @param {string} queryString Ickle query. + * @param {Object} [params] Named parameter map. + * @returns {Buffer[]} Array of wrapped param byte arrays. + */ + function buildCQParams(queryString, params) { + var result = [codec.wrapScalar(queryString)]; + if (f.existy(params)) { + _.each(params, function(value, name) { + result.push(codec.wrapScalar(name)); + result.push(codec.wrapScalar(value)); + }); + } + return result; + } + return listen; } diff --git a/lib/protocols.js b/lib/protocols.js index 537deca..c9e9af9 100644 --- a/lib/protocols.js +++ b/lib/protocols.js @@ -699,18 +699,31 @@ var steps = [ codec.encodeString(listenerId), // listener id - codec.encodeUByte(includeState), // include state - codec.encodeUByte(0) // TODO filter factory name + codec.encodeUByte(includeState) // include state ]; + // Filter factory + if (_.has(opts, 'filterFactory') && _.has(opts.filterFactory, 'name')) { + steps.push(codec.encodeString(opts.filterFactory.name)); + var filterParams = opts.filterFactory.params || []; + steps.push(codec.encodeUByte(filterParams.length)); + filterParams.forEach(function(p) { steps.push(codec.encodeBytesWithLength(p)); }); + } else { + steps.push(codec.encodeUByte(0)); + } + + // Converter factory if (_.has(opts, 'converterFactory') && _.has(opts.converterFactory, 'name')) { steps.push(codec.encodeString(opts.converterFactory.name)); - steps.push(codec.encodeUByte(0)); // TODO add converter parameter support + var converterParams = opts.converterFactory.params || []; + steps.push(codec.encodeUByte(converterParams.length)); + converterParams.forEach(function(p) { steps.push(codec.encodeBytesWithLength(p)); }); } else { - steps.push(codec.encodeUByte(0)); // no converter + steps.push(codec.encodeUByte(0)); } - steps.push(codec.encodeUByte(0)); // raw data disabled + // Raw data + steps.push(codec.encodeUByte(hasOpt(opts, 'useRawData') ? 1 : 0)); return function() { return steps; diff --git a/spec/codec_spec.js b/spec/codec_spec.js index 1f090b3..286161e 100644 --- a/spec/codec_spec.js +++ b/spec/codec_spec.js @@ -254,3 +254,89 @@ function encodeDecode(size, encoder, decoder, bufferSize) { var dec = f.actions(_.isArray(decoder) ? decoder : [decoder], codec.allDecoded(decoder.length)); return dec({buf: bytebuf.buf, offset: 0}); } + +var protobuf = require('protobufjs'); +var path = require('path'); + +describe('wrapScalar', function() { + var WrappedMessage; + beforeAll(function() { + var root = protobuf.loadSync(path.join(__dirname, '..', 'lib', 'protostream', 'message-wrapping.proto')); + WrappedMessage = root.lookupType('org.infinispan.protostream.WrappedMessage'); + }); + + it('wraps a string value', function() { + var bytes = codec.wrapScalar('hello'); + expect(bytes).toBeDefined(); + expect(bytes.length).toBeGreaterThan(0); + var decoded = WrappedMessage.decode(bytes); + expect(decoded.wrappedString).toBe('hello'); + }); + + it('wraps a number value', function() { + var bytes = codec.wrapScalar(42.5); + expect(bytes).toBeDefined(); + var decoded = WrappedMessage.decode(bytes); + expect(decoded.wrappedDouble).toBe(42.5); + }); + + it('wraps a boolean value', function() { + var bytes = codec.wrapScalar(true); + expect(bytes).toBeDefined(); + var decoded = WrappedMessage.decode(bytes); + expect(decoded.wrappedBool).toBe(true); + }); +}); + +describe('decodeContinuousQueryResult', function() { + var ContinuousQueryResult; + beforeAll(function() { + var root = protobuf.loadSync(path.join(__dirname, '..', 'lib', 'protostream', 'query.proto')); + protobuf.loadSync(path.join(__dirname, '..', 'lib', 'protostream', 'message-wrapping.proto'), root); + ContinuousQueryResult = root.lookupType('org.infinispan.query.remote.client.ContinuousQueryResult'); + }); + + it('decodes a JOINING result', function() { + var keyBytes = Buffer.from('my-key'); + var valueBytes = Buffer.from('my-value'); + var msg = ContinuousQueryResult.create({resultType: 1, key: keyBytes, value: valueBytes}); + var encoded = ContinuousQueryResult.encode(msg).finish(); + var result = codec.decodeContinuousQueryResult(encoded); + expect(result.resultType).toBe('joining'); + expect(Buffer.from(result.key)).toEqual(keyBytes); + expect(Buffer.from(result.value)).toEqual(valueBytes); + }); + + it('decodes a LEAVING result', function() { + var keyBytes = Buffer.from('my-key'); + var msg = ContinuousQueryResult.create({resultType: 0, key: keyBytes}); + var encoded = ContinuousQueryResult.encode(msg).finish(); + var result = codec.decodeContinuousQueryResult(encoded); + expect(result.resultType).toBe('leaving'); + expect(Buffer.from(result.key)).toEqual(keyBytes); + }); + + it('decodes an UPDATED result', function() { + var keyBytes = Buffer.from('my-key'); + var valueBytes = Buffer.from('new-value'); + var msg = ContinuousQueryResult.create({resultType: 2, key: keyBytes, value: valueBytes}); + var encoded = ContinuousQueryResult.encode(msg).finish(); + var result = codec.decodeContinuousQueryResult(encoded); + expect(result.resultType).toBe('updated'); + expect(Buffer.from(result.key)).toEqual(keyBytes); + expect(Buffer.from(result.value)).toEqual(valueBytes); + }); +}); + +describe('decodeWrappedMessage', function() { + it('decodes a WrappedMessage with wrappedMessage field', function() { + var inner = Buffer.from([0x0a, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f]); + var wmRoot = protobuf.loadSync(path.join(__dirname, '..', 'lib', 'protostream', 'message-wrapping.proto')); + var WM = wmRoot.lookupType('org.infinispan.protostream.WrappedMessage'); + var msg = WM.create({wrappedMessage: inner, wrappedTypeId: 4403}); + var encoded = WM.encode(msg).finish(); + var decoded = codec.decodeWrappedMessage(encoded); + expect(decoded.wrappedTypeId).toBe(4403); + expect(Buffer.from(decoded.wrappedMessage)).toEqual(inner); + }); +}); diff --git a/spec/infinispan_cq_spec.js b/spec/infinispan_cq_spec.js new file mode 100644 index 0000000..5736938 --- /dev/null +++ b/spec/infinispan_cq_spec.js @@ -0,0 +1,223 @@ +'use strict'; + +var t = require('./utils/testing'); +var ispn = require('../lib/infinispan'); +var protobuf = require('protobufjs'); + +var personProto = 'package tutorial;\n' + + 'syntax = "proto3";\n' + + '/**\n' + + ' * @TypeId(1000042)\n' + + ' */\n' + + 'message Person {\n' + + ' string firstName = 1;\n' + + ' string lastName = 2;\n' + + ' int32 bornYear = 3;\n' + + ' string bornIn = 4;\n' + + '}\n'; + +var root = protobuf.parse(personProto).root; +var Person = root.lookupType('.tutorial.Person'); + +var protoOpts = { + authentication: t.authOpts.authentication, + cacheName: 'protoStreamCache', + dataFormat: { + keyType: 'application/x-protostream', + valueType: 'application/x-protostream' + } +}; + +/** + * @param {number} ms Milliseconds to sleep. + * @returns {Promise} Resolves after the delay. + */ +function sleep(ms) { + return new Promise(function(resolve) { setTimeout(resolve, ms); }); +} + +/** + * Register the schema, connect a protostream client, run the body, then clean up. + * @param {Function} body Async function receiving the connected client. + * @returns {Promise} Resolves when the test is complete. + */ +async function withCQClient(body) { + var metaClient = await ispn.client(t.local, { + authentication: t.authOpts.authentication, + cacheName: '___protobuf_metadata', + dataFormat: {keyType: 'text/plain', valueType: 'text/plain'} + }); + try { + await metaClient.put('tutorial/Person.proto', personProto); + } finally { + await metaClient.disconnect(); + } + + var client = await t.client(t.local, protoOpts); + try { + client.registerProtostreamRoot(root); + client.registerProtostreamType('.tutorial.Person', 1000042); + await client.clear(); + await body(client); + } finally { + await client.clear(); + await client.disconnect(); + } + + var cleanup = await ispn.client(t.local, { + authentication: t.authOpts.authentication, + cacheName: '___protobuf_metadata', + dataFormat: {keyType: 'text/plain', valueType: 'text/plain'} + }); + try { + await cleanup.remove('tutorial/Person.proto'); + } finally { + await cleanup.disconnect(); + } +} + +describe('Continuous query', function() { + + it('receives joining events for matching entries', async function() { + await withCQClient(async function(client) { + var events = []; + var cq = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = \'London\'' + ); + cq.on('joining', function(key, value) { + events.push({type: 'joining', key: key, value: value}); + }); + + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'London'})); + await client.put(1, Person.create({firstName: 'Harry', lastName: 'Potter', bornYear: 1980, bornIn: 'Godrics Hollow'})); + await client.put(2, Person.create({firstName: 'Ron', lastName: 'Weasley', bornYear: 1980, bornIn: 'London'})); + await sleep(1000); + + expect(events.length).toBe(2); + events.forEach(function(e) { + expect(e.type).toBe('joining'); + expect(e.key).toBeDefined(); + expect(e.value).toBeDefined(); + }); + + await client.removeContinuousQuery(cq); + }); + }); + + it('receives leaving events when entries no longer match', async function() { + await withCQClient(async function(client) { + var joining = []; + var leaving = []; + var cq = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = \'London\'' + ); + cq.on('joining', function(key, value) { + joining.push({key: key, value: value}); + }); + cq.on('leaving', function(key) { + leaving.push({key: key}); + }); + + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'London'})); + await sleep(500); + expect(joining.length).toBe(1); + + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'Paris'})); + await sleep(500); + expect(leaving.length).toBe(1); + + await client.removeContinuousQuery(cq); + }); + }); + + it('receives no events after removal', async function() { + await withCQClient(async function(client) { + var events = []; + var cq = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = \'London\'' + ); + cq.on('joining', function() { + events.push('joining'); + }); + + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'London'})); + await sleep(500); + expect(events.length).toBe(1); + + await client.removeContinuousQuery(cq); + + await client.put(1, Person.create({firstName: 'Ron', lastName: 'Weasley', bornYear: 1980, bornIn: 'London'})); + await sleep(500); + expect(events.length).toBe(1); + }); + }); + + it('supports named parameters', async function() { + await withCQClient(async function(client) { + var events = []; + var cq = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = :city', + {params: {city: 'London'}} + ); + cq.on('joining', function(key, value) { + events.push({key: key, value: value}); + }); + + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'London'})); + await client.put(1, Person.create({firstName: 'Harry', lastName: 'Potter', bornYear: 1980, bornIn: 'Godrics Hollow'})); + await sleep(1000); + + expect(events.length).toBe(1); + + await client.removeContinuousQuery(cq); + }); + }); + + it('can register multiple continuous queries simultaneously', async function() { + await withCQClient(async function(client) { + var londonEvents = []; + var hollowEvents = []; + + var cq1 = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = \'London\'' + ); + cq1.on('joining', function() { londonEvents.push('joining'); }); + + var cq2 = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = \'Godrics Hollow\'' + ); + cq2.on('joining', function() { hollowEvents.push('joining'); }); + + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'London'})); + await client.put(1, Person.create({firstName: 'Harry', lastName: 'Potter', bornYear: 1980, bornIn: 'Godrics Hollow'})); + await client.put(2, Person.create({firstName: 'Ron', lastName: 'Weasley', bornYear: 1980, bornIn: 'London'})); + await sleep(1000); + + expect(londonEvents.length).toBe(2); + expect(hollowEvents.length).toBe(1); + + await client.removeContinuousQuery(cq1); + await client.removeContinuousQuery(cq2); + }); + }); + + it('receives joining events for initial state when entries already exist', async function() { + await withCQClient(async function(client) { + await client.put(0, Person.create({firstName: 'Hermione', lastName: 'Granger', bornYear: 1979, bornIn: 'London'})); + await client.put(1, Person.create({firstName: 'Harry', lastName: 'Potter', bornYear: 1980, bornIn: 'Godrics Hollow'})); + + var events = []; + var cq = await client.addContinuousQuery( + 'FROM tutorial.Person p WHERE p.bornIn = \'London\'' + ); + cq.on('joining', function(key, value) { + events.push({key: key, value: value}); + }); + await sleep(1000); + + expect(events.length).toBe(1); + + await client.removeContinuousQuery(cq); + }); + }); +}); diff --git a/types/index.d.ts b/types/index.d.ts index d73e844..a717b69 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -668,6 +668,34 @@ export function client(args: { * @since 0.3 */ removeListener: (listenerId: string) => Promise; + /** + * Register a continuous query that watches for cache changes + * matching the given Ickle query. + * + * @param queryString Ickle query string. + * @param opts Optional CQ options. + * @returns A promise completed with a ContinuousQuery handle. + * @memberof Client# + * @since 0.16 + */ + addContinuousQuery: (queryString: string, opts?: { + /** Named parameter bindings for the Ickle query. */ + params?: { [name: string]: string | number | boolean }; + }) => Promise<{ + /** Register a callback for continuous query events. */ + on(event: 'joining' | 'leaving' | 'updated', callback: (key: Buffer, value: Buffer, projection?: any[]) => void): any; + /** Get the listener ID for this continuous query. */ + getListenerId(): string; + }>; + /** + * Remove a continuous query. + * + * @param cq ContinuousQuery handle returned by addContinuousQuery. + * @returns A promise completed when the continuous query has been removed. + * @memberof Client# + * @since 0.16 + */ + removeContinuousQuery: (cq: { getListenerId(): string }) => Promise; /** * Create a distributed counter. * From b601ab2d71bc6718129f78a00c3d49f161a37ebc Mon Sep 17 00:00:00 2001 From: Tristan Tarrant Date: Thu, 14 May 2026 10:27:51 +0200 Subject: [PATCH 2/2] [#148] Fix CQ proto syntax ordering and event buffering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move 'syntax = "proto3"' before 'package' declaration — Protostream requires the syntax statement to be the first non-comment line. Buffer CQ state events during listener registration so that includeState events are not lost before the caller attaches .on() handlers. Also propagate errors from addContinuousQuery instead of swallowing them. --- lib/listeners.js | 24 +++++++++++++++++++++--- spec/infinispan_cq_spec.js | 4 ++-- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib/listeners.js b/lib/listeners.js index 3b09a5b..443ef82 100644 --- a/lib/listeners.js +++ b/lib/listeners.js @@ -186,6 +186,9 @@ return f.cat(encodeListenerAddCommon, encodeListenerAddInterests); }; + var bufferedEvents = []; + var buffering = true; + var cqEmitFn = function(event, emitter, bytebuf) { var payloadLength = codec.decodeVariableBytes()(bytebuf); if (!f.existy(payloadLength) || !f.existy(payloadLength.answer)) @@ -195,7 +198,11 @@ if (!f.existy(wrapped) || !f.existy(wrapped.wrappedMessage)) return true; var cqResult = codec.decodeContinuousQueryResult(wrapped.wrappedMessage); - emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection); + if (buffering) { + bufferedEvents.push(cqResult); + } else { + emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection); + } return true; }; @@ -219,15 +226,26 @@ var l = listeners.get(listenerId); var emitter = l.emitter; return { - on: function(event, callback) { emitter.on(event, callback); return this; }, + on: function(event, callback) { + emitter.on(event, callback); + if (buffering) { + buffering = false; + bufferedEvents.forEach(function(ev) { + emitter.emit(ev.resultType, ev.key, ev.value, ev.projection); + }); + bufferedEvents = []; + } + return this; + }, getListenerId: function() { return listenerId; } }; } listen.removeListeners(listenerId); return undefined; }) - .catch(function() { + .catch(function(err) { listen.removeListeners(listenerId); + throw err; }); }, }; diff --git a/spec/infinispan_cq_spec.js b/spec/infinispan_cq_spec.js index 5736938..ee1405c 100644 --- a/spec/infinispan_cq_spec.js +++ b/spec/infinispan_cq_spec.js @@ -4,8 +4,8 @@ var t = require('./utils/testing'); var ispn = require('../lib/infinispan'); var protobuf = require('protobufjs'); -var personProto = 'package tutorial;\n' - + 'syntax = "proto3";\n' +var personProto = 'syntax = "proto3";\n' + + 'package tutorial;\n' + '/**\n' + ' * @TypeId(1000042)\n' + ' */\n'