Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions documentation/asciidoc/topics/proc_continuous_queries.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[id='continuous-queries_{context}']
= Continuous queries

Continuous queries let you receive real-time notifications when cache entries join, leave, or are updated relative to an Ickle query.
Unlike regular listeners, continuous queries filter events on the server side, so your client only receives events for entries that match the query.

== Registering a continuous query

Register a continuous query with `addContinuousQuery()`, providing an Ickle query string and optional named parameters:

[source,javascript,options="nowrap",subs=attributes+]
----
const cq = await client.addContinuousQuery(
'FROM tutorial.InstaPost p WHERE p.user = :userName',
{params: {userName: 'belen_esteban'}}
);
----

== Handling events

Subscribe to the three event types on the returned handle:

[source,javascript,options="nowrap",subs=attributes+]
----
cq.on('joining', function(key, value, projection) {
console.log('Entry joined the result set');
});

cq.on('leaving', function(key, value, projection) {
console.log('Entry left the result set');
});

cq.on('updated', function(key, value, projection) {
console.log('Matching entry was updated');
});
----

* **joining** -- fired when a new entry matches the query, or an existing entry is modified to match.
* **leaving** -- fired when an entry no longer matches the query, either because it was removed or modified.
* **updated** -- fired when an entry that already matches the query is modified but still matches.

== Removing a continuous query

[source,javascript,options="nowrap",subs=attributes+]
----
await client.removeContinuousQuery(cq);
----

== Requirements

* The cache must use Protostream (`application/x-protostream`) encoding.
* Protobuf schemas must be registered on the server before registering the continuous query.
* The client must call `registerProtostreamRoot()` and `registerProtostreamType()` for the types used in the query.
39 changes: 38 additions & 1 deletion lib/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,48 @@
protobuf.loadSync(path.join(`${__dirname}/protostream/message-wrapping.proto`),root); //loaded the wrappedMessage.proto to the root
var QueryRequest = root.lookupType('org.infinispan.query.remote.client.QueryRequest');
var QueryResponse = root.lookupType('org.infinispan.query.remote.client.QueryResponse');
var ContinuousQueryResult = root.lookupType('org.infinispan.query.remote.client.ContinuousQueryResult');

return {
QueryRequest,
QueryResponse
QueryResponse,
ContinuousQueryResult
};
}());

var CQ_RESULT_TYPES = ['leaving', 'joining', 'updated'];

/**
* Wrap a scalar value as a WrappedMessage byte array.
* @param {string|number|boolean} value - The value to wrap.
* @returns {Buffer} The encoded WrappedMessage bytes.
*/
exports.wrapScalar = function(value) {
return WrappedMessage.encode(createWrappedMessage(value, _.identity)).finish();
};

/**
* Decode a ContinuousQueryResult from raw protobuf bytes.
* @param {Buffer} bytes - The raw ContinuousQueryResult bytes.
* @returns {Object} Decoded result with resultType, key, value, projection.
*/
exports.decodeContinuousQueryResult = function(bytes) {
var msg = Query.ContinuousQueryResult.decode(bytes);
return {
resultType: CQ_RESULT_TYPES[msg.resultType] || 'leaving',
key: msg.key,
value: msg.value,
projection: msg.projection
};
};

/**
* Decode a WrappedMessage from raw bytes.
* @param {Buffer} bytes - The raw WrappedMessage bytes.
* @returns {Object} The decoded WrappedMessage object.
*/
exports.decodeWrappedMessage = function(bytes) {
return WrappedMessage.decode(bytes);
};

}.call(this));
52 changes: 52 additions & 0 deletions lib/infinispan.js
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,58 @@
return false;
});
},
/**
* Continuous query options.
*
* @typedef {Object} ContinuousQueryOptions
* @property {Object} [params] - Named parameter bindings for the Ickle query.
* @since 0.16
*/
/**
* Register a continuous query that watches for cache changes
* matching the given Ickle query.
*
* @param {String} queryString Ickle query string.
* @param {ContinuousQueryOptions=} opts Optional CQ options.
* @returns {Promise<Object>}
* A promise completed with a ContinuousQuery handle.
* Use cq.on('joining', fn), cq.on('leaving', fn), cq.on('updated', fn)
* to receive events.
* @memberof Client#
* @since 0.16
*/
addContinuousQuery: function(queryString, opts) {
var ctx = transport.context(SMALL);
logger.debugf('Invoke addContinuousQuery(msgId=%d,query=%s)', ctx.id, queryString);
return listen.addContinuousQueryListener(transport, ctx, queryString, opts);
},
/**
* Remove a continuous query.
*
* @param {Object} cq ContinuousQuery handle returned by addContinuousQuery.
* @returns {Promise}
* A promise completed when the continuous query has been removed.
* @memberof Client#
* @since 0.16
*/
removeContinuousQuery: function(cq) {
var listenerId = cq.getListenerId();
var ctx = transport.context(SMALL);
logger.debugf('Invoke removeContinuousQuery(msgId=%d,listenerId=%s)', ctx.id, listenerId);
var conn = listen.findConnectionListener(listenerId);
if (!f.existy(conn))
return Promise.reject(
new Error(`No server connection for CQ listener (listenerId=${ listenerId })`));

var remote = futurePinned(ctx, 0x27, p.encodeListenerId(listenerId), p.complete(p.hasSuccess), conn);
return remote.then(function (success) {
if (success) {
listen.removeListeners(listenerId);
return true;
}
return false;
});
},
/**
* Add script to server(s).
*
Expand Down
118 changes: 116 additions & 2 deletions lib/listeners.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

var events = require('events');

var CQ_FACTORY = 'continuous-query-filter-converter-factory';

module.exports = listeners;

/**
Expand Down Expand Up @@ -132,8 +134,10 @@
dispatchEvent: function(event, listenerId, bytebuf, emitFunc) {
return function() {
var l = listeners.get(listenerId);
if (f.existy(l))
return emitFunc(event, l.emitter, bytebuf, listenerId);
if (f.existy(l)) {
var emit = l.cqEmit || emitFunc;
return emit(event, l.emitter, bytebuf, listenerId);
}

logger.error('No emitter exists for listener %s', listenerId);
return true;
Expand All @@ -152,7 +156,117 @@
setProtocol: function(newProtocol) {
protocol = newProtocol;
},
/**
* Register a continuous query listener.
* @param {Object} transport Transport instance.
* @param {Object} ctx Request context.
* @param {string} queryString Ickle query string.
* @param {Object} [opts] Options with optional params map.
* @returns {Promise<Object>} ContinuousQuery handle.
*/
addContinuousQueryListener: function(transport, ctx, queryString, opts) {
opts = opts || {};
var listenerId = _.uniqueId('cq_');
logger.debugl(function() {
return ['Invoke addContinuousQueryListener(msgId=%d,query=%s,listenerId=%s)',
ctx.id, queryString, listenerId]; });

var cqParams = buildCQParams(queryString, opts.params);

var listenerOpts = {
includeState: true,
useRawData: true,
filterFactory: { name: CQ_FACTORY, params: cqParams },
converterFactory: { name: CQ_FACTORY, params: cqParams }
};

var encodeListenerAddCommon = protocol.encodeListenerAdd(listenerId, listenerOpts)();
var encodeListenerAddInterests = protocol.encodeListenerInterests(listenerOpts);
var encodeListenerAdd = function() {
return f.cat(encodeListenerAddCommon, encodeListenerAddInterests);
};

var bufferedEvents = [];
var buffering = true;

var cqEmitFn = function(event, emitter, bytebuf) {
var payloadLength = codec.decodeVariableBytes()(bytebuf);
if (!f.existy(payloadLength) || !f.existy(payloadLength.answer))
return false;
var payload = payloadLength.answer;
var wrapped = codec.decodeWrappedMessage(payload);
if (!f.existy(wrapped) || !f.existy(wrapped.wrappedMessage))
return true;
var cqResult = codec.decodeContinuousQueryResult(wrapped.wrappedMessage);
if (buffering) {
bufferedEvents.push(cqResult);
} else {
emitter.emit(cqResult.resultType, cqResult.key, cqResult.value, cqResult.projection);
}
return true;
};

var preWrite = function(conn) {
var emitter = new events.EventEmitter();
listeners.put(listenerId, {
id: listenerId,
emitter: emitter,
conn: conn,
cqEmit: cqEmitFn
});
};

var remote = futurePreWrite(transport, ctx, 0x25
, encodeListenerAdd, protocol.complete(protocol.hasSuccess)
, listenerOpts, preWrite);

return remote
.then(function(success) {
if (success) {
var l = listeners.get(listenerId);
var emitter = l.emitter;
return {
on: function(event, callback) {
emitter.on(event, callback);
if (buffering) {
buffering = false;
bufferedEvents.forEach(function(ev) {
emitter.emit(ev.resultType, ev.key, ev.value, ev.projection);
});
bufferedEvents = [];
}
return this;
},
getListenerId: function() { return listenerId; }
};
}
listen.removeListeners(listenerId);
return undefined;
})
.catch(function(err) {
listen.removeListeners(listenerId);
throw err;
});
},
};

/**
* Build CQ factory params from query and named params.
* @param {string} queryString Ickle query.
* @param {Object} [params] Named parameter map.
* @returns {Buffer[]} Array of wrapped param byte arrays.
*/
function buildCQParams(queryString, params) {
var result = [codec.wrapScalar(queryString)];
if (f.existy(params)) {
_.each(params, function(value, name) {
result.push(codec.wrapScalar(name));
result.push(codec.wrapScalar(value));
});
}
return result;
}

return listen;
}

Expand Down
23 changes: 18 additions & 5 deletions lib/protocols.js
Original file line number Diff line number Diff line change
Expand Up @@ -699,18 +699,31 @@

var steps = [
codec.encodeString(listenerId), // listener id
codec.encodeUByte(includeState), // include state
codec.encodeUByte(0) // TODO filter factory name
codec.encodeUByte(includeState) // include state
];

// Filter factory
if (_.has(opts, 'filterFactory') && _.has(opts.filterFactory, 'name')) {
steps.push(codec.encodeString(opts.filterFactory.name));
var filterParams = opts.filterFactory.params || [];
steps.push(codec.encodeUByte(filterParams.length));
filterParams.forEach(function(p) { steps.push(codec.encodeBytesWithLength(p)); });
} else {
steps.push(codec.encodeUByte(0));
}

// Converter factory
if (_.has(opts, 'converterFactory') && _.has(opts.converterFactory, 'name')) {
steps.push(codec.encodeString(opts.converterFactory.name));
steps.push(codec.encodeUByte(0)); // TODO add converter parameter support
var converterParams = opts.converterFactory.params || [];
steps.push(codec.encodeUByte(converterParams.length));
converterParams.forEach(function(p) { steps.push(codec.encodeBytesWithLength(p)); });
} else {
steps.push(codec.encodeUByte(0)); // no converter
steps.push(codec.encodeUByte(0));
}

steps.push(codec.encodeUByte(0)); // raw data disabled
// Raw data
steps.push(codec.encodeUByte(hasOpt(opts, 'useRawData') ? 1 : 0));

return function() {
return steps;
Expand Down
Loading