Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- run: npm install

- name: Run tests
run: npm run test:docker
run: npm test

- name: Tear down containers
if: always()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Take a look at some examples for using the {hr_js} client with {brandname}.

include::{topics}/ref_client_usage.adoc[leveloffset=+1]
include::{topics}/proc_transactions.adoc[leveloffset=+1]

// Restore the parent context.
ifdef::parent-context[:context: {parent-context}]
Expand Down
109 changes: 109 additions & 0 deletions documentation/asciidoc/topics/code_examples/transactions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
var infinispan = require('infinispan');

var connected = infinispan.client(
{port: 11222, host: '127.0.0.1'},
{
authentication: {
enabled: true,
saslMechanism: 'SCRAM-SHA-256',
userName: 'admin',
password: 'changeme'
},
clientIntelligence: 'BASIC'
}
);

connected.then(function (adminClient) {

// Create a cache with transactional configuration.
var txConfig =
'<distributed-cache>' +
'<encoding><key media-type="text/plain"/><value media-type="text/plain"/></encoding>' +
'<transaction mode="NON_XA" locking="PESSIMISTIC"/>' +
'</distributed-cache>';

var create = adminClient.admin.getOrCreateCache('txCache', txConfig);

return create.then(function() {
return adminClient.disconnect();
}).then(function() {

// Connect to the transactional cache.
return infinispan.client(
{port: 11222, host: '127.0.0.1'},
{
authentication: {
enabled: true,
saslMechanism: 'SCRAM-SHA-256',
userName: 'admin',
password: 'changeme'
},
cacheName: 'txCache',
topologyUpdates: false,
dataFormat: {
keyType: 'text/plain',
valueType: 'text/plain'
}
}
);

}).then(function(client) {

// Get the transaction manager.
var tm = client.getTransactionManager();

// Begin a transaction, put entries, and commit.
var committed = tm.begin().then(function() {
return client.put('key1', 'value1');
}).then(function() {
return client.put('key2', 'value2');
}).then(function() {
return tm.commit();
});

// Begin a transaction, put an entry, and roll back.
var rolledBack = committed.then(function() {
return tm.begin();
}).then(function() {
return client.put('key3', 'should-not-exist');
}).then(function() {
return tm.rollback();
});

// Read-then-write with version-based conflict detection.
var readWrite = rolledBack.then(function() {
return client.put('counter', '0');
}).then(function() {
return tm.begin();
}).then(function() {
return client.get('counter');
}).then(function(current) {
return client.put('counter', String(parseInt(current) + 1));
}).then(function() {
return tm.commit();
});

// Remove an entry within a transaction.
var removed = readWrite.then(function() {
return client.put('temp', 'ephemeral');
}).then(function() {
return tm.begin();
}).then(function() {
return client.remove('temp');
}).then(function() {
return tm.commit();
});

// Disconnect from {brandname} Server.
return removed.then(function() {
return client.disconnect();
});

});

}).catch(function(error) {

// Log any errors.
console.log("Got error: " + error.message);

});
77 changes: 77 additions & 0 deletions documentation/asciidoc/topics/proc_transactions.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
[id='transactions_{context}']
= Transactions

Transactions let you group multiple cache operations into an atomic unit that is either committed or rolled back together.
The {hr_js} client implements client-side transactions using the Hot Rod `PREPARE_TX`, `COMMIT_TX`, and `ROLLBACK_TX` protocol operations.

== Transaction requirements

Transactional caches on {brandname} Server must use `NON_XA` transaction mode with `PESSIMISTIC` locking:

[source,xml,options="nowrap"]
----
<distributed-cache>
<encoding>
<key media-type="text/plain"/>
<value media-type="text/plain"/>
</encoding>
<transaction mode="NON_XA" locking="PESSIMISTIC"/>
</distributed-cache>
----

== How transactions work

When you begin a transaction, the client buffers all `put()` and `remove()` operations locally.
Calls to `get()` check the local buffer first, then fall back to the server.

When you read a key from the server during a transaction, the client records the entry version.
On commit, the server verifies that versions have not changed, providing conflict detection for read-then-write patterns.

On commit, the client sends all buffered modifications to the server in a single `PREPARE_TX` request.
The {hr_js} client uses one-phase commit for single-cache transactions, so no separate `COMMIT_TX` request is needed.

On rollback, the client discards all buffered modifications and sends a `ROLLBACK_TX` request to the server.

== Using transactions

.Procedure

. Get the transaction manager from the client.
. Call `begin()` to start a transaction.
. Perform cache operations (`put()`, `get()`, `remove()`).
. Call `commit()` to apply changes atomically, or `rollback()` to discard them.

[source,javascript,options="nowrap",subs=attributes+]
----
include::code_examples/transactions.js[]
----

.Available transaction operations
[cols="1,3",options="header"]
|===
|Method |Description

|`getTransactionManager()`
|Returns the transaction manager for the client.

|`begin()`
|Starts a new transaction.
All subsequent `put()`, `get()`, and `remove()` calls are associated with this transaction.

|`commit()`
|Commits the transaction.
Sends all buffered modifications to the server atomically.
Throws an error if the server rejects the transaction due to a version conflict.

|`rollback()`
|Rolls back the transaction.
Discards all buffered modifications.

|`isActive()`
|Returns `true` if a transaction is currently in progress.
|===

[NOTE]
====
Connect to the transactional cache with `topologyUpdates: false` to prevent key-based routing, which is not compatible with transaction operations.
====
7 changes: 7 additions & 0 deletions documentation/asciidoc/topics/ref_client_usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,10 @@ include::code_examples/queries.js[]
----

See link:{query_docs}[Querying {brandname} caches] for more information.

== Using transactions

Transactions let you group multiple cache operations into an atomic unit.
The {hr_js} client buffers operations locally and sends them to the server on commit.

For detailed information, see link:#transactions_{context}[Transactions].
12 changes: 12 additions & 0 deletions lib/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
exports.decodeSignedInt = f.lift(doDecodeSignedInt, _.identity);
exports.decodeVariableBytes = f.lift(doDecodeVariableBytes, _.identity);
exports.decodeShort = f.lift(doDecodeShort, _.identity);
exports.decodeInt = f.lift(doDecodeInt, _.identity);
exports.decodeProtobuf = f.lift(doDecodeProtobuf, _.identity);
exports.decodeQuery = f.lift(doDecodeQuery,_.identity);

Expand Down Expand Up @@ -560,6 +561,17 @@
return uncheckedReadShort(bytebuf)();
}

function doDecodeInt(bytebuf) {

Check warning on line 564 in lib/codec.js

View workflow job for this annotation

GitHub Actions / lint

Missing JSDoc comment
if (4 > bytebuf.buf.length - bytebuf.offset) {
logger.tracef('Can not fully read 4 bytes (buffer size is %d, buffer offset %d)',
bytebuf.buf.length, bytebuf.offset);
return undefined;
}
var val = bytebuf.buf.readInt32BE(bytebuf.offset);
bytebuf.offset += 4;
return val;
}

/**
* Decode a Protobuf WrappedMessage from the buffer and unwrap it.
* @param {Object} bytebuf - The byte buffer to read from.
Expand Down
90 changes: 90 additions & 0 deletions lib/infinispan.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
var io = require('./io');
var listeners = require('./listeners');
var nearCacheFactory = require('./near-cache');
var tx = require('./transaction');

var Client = function(addrs, clientOpts) {
var logger = u.logger('client');
Expand All @@ -39,6 +40,7 @@
var p = protocolResolver(clientOpts['version']);
var listen = listeners(p);
var nc = clientOpts.nearCache ? nearCacheFactory(clientOpts.nearCache.maxEntries) : null;
var txCtx = null;

var TINY = 16, SMALL = 32, MEDIUM = 64, BIG = 128;

Expand Down Expand Up @@ -327,6 +329,17 @@
* @since 0.3
*/
get: function(k) {
if (txCtx) {
var local = txCtx.getLocalValue(k);
if (local.found) return Promise.resolve(local.value);
var ctx = transport.context(SMALL);
logger.debugf('Invoke transactional get(msgId=%d,key=%s)', ctx.id, u.str(k));
var decoder = p.decodeWithMeta();
return futureKey(ctx, 0x1B, k, p.encodeKey(k), decoder).then(function(meta) {
txCtx.trackRead(k, meta);
return meta ? meta.value : undefined;
});
}
if (nc) {
var cached = nc.get(k);
if (cached !== undefined) {
Expand Down Expand Up @@ -455,6 +468,11 @@
* @since 0.3
*/
put: function(k, v, opts) {
if (txCtx) {
logger.debugf('Transactional put(key=%s)', u.str(k));
txCtx.trackPut(k, v);
return Promise.resolve(undefined);
}
if (nc) nc.remove(k);
var ctx = transport.context(MEDIUM);
logger.debugl(function() { return ['Invoke put(msgId=%d,key=%s,value=%s,opts=%s)',
Expand Down Expand Up @@ -486,6 +504,11 @@
* @since 0.3
*/
remove: function(k, opts) {
if (txCtx) {
logger.debugf('Transactional remove(key=%s)', u.str(k));
txCtx.trackRemove(k);
return Promise.resolve(true);
}
if (nc) nc.remove(k);
var ctx = transport.context(SMALL);
logger.debugl(function() {return ['Invoke remove(msgId=%d,key=%s,opts=%s)',
Expand Down Expand Up @@ -1261,6 +1284,73 @@
logger.debugf('Invoke admin.removeSchema(msgId=%d,name=%s)', ctx.id, name);
return future(ctx, 0x2B, p.encodeNameParams('@@schemas@delete', {name: name}), p.decodeValue());
}
},
/**
* Get the transaction manager for this client.
* Transactions buffer put/remove operations locally and send them
* to the server atomically on commit via PREPARE_TX_2.
* During an active transaction, get() reads local writes first,
* falling back to getWithMetadata to capture entry versions for
* conflict detection.
*
* @returns {Object} Transaction manager with begin(), commit(), and rollback() methods.
* @memberof Client#
* @since 0.16
*/
getTransactionManager: function() {
return {
begin: function() {
if (txCtx) throw new Error('Transaction already active');
txCtx = new tx.TransactionContext();
logger.debugf('Transaction started (xid=%s)',
txCtx.xid.globalTxId.toString('hex'));
return Promise.resolve();
},
commit: function() {
if (!txCtx) throw new Error('No active transaction');
var xid = txCtx.xid;
var mods = txCtx.getModifications();
if (mods.length === 0) {
txCtx = null;
return Promise.resolve();
}
var ctx = transport.context(BIG);
logger.debugf('Invoke prepareTx(msgId=%d, mods=%d)', ctx.id, mods.length);
var decoder = p.decodeXaResponse();
return future(ctx, 0x7D, p.encodePrepare(xid, mods, true, 60000), decoder)
.then(function(xaCode) {
txCtx = null;
if (xaCode !== tx.XA_OK && xaCode !== tx.XA_RDONLY) {
throw new Error(`Transaction prepare failed: XA code ${xaCode}`);
}
})
.catch(function(err) {
txCtx = null;
throw err;
});
},
rollback: function() {
if (!txCtx) throw new Error('No active transaction');
var xid = txCtx.xid;
var mods = txCtx.getModifications();
if (mods.length === 0) {
txCtx = null;
return Promise.resolve();
}
var ctx = transport.context(SMALL);
logger.debugf('Invoke rollbackTx(msgId=%d)', ctx.id);
var decoder = p.decodeXaResponse();
return future(ctx, 0x3F, p.encodeXidOnly(xid), decoder)
.then(function() { txCtx = null; })
.catch(function(err) {
txCtx = null;
throw err;
});
},
isActive: function() {
return txCtx !== null;
}
};
}
};
};
Expand Down
Loading