diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 130b547..39692b9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - run: npm install - name: Run tests - run: npm run test:docker + run: npm test - name: Tear down containers if: always() diff --git a/documentation/asciidoc/stories/assembly_client_usage_examples.adoc b/documentation/asciidoc/stories/assembly_client_usage_examples.adoc index ad3cca9..3475d7f 100644 --- a/documentation/asciidoc/stories/assembly_client_usage_examples.adoc +++ b/documentation/asciidoc/stories/assembly_client_usage_examples.adoc @@ -4,6 +4,7 @@ Take a look at some examples for using the {hr_js} client with {brandname}. include::{topics}/ref_client_usage.adoc[leveloffset=+1] +include::{topics}/proc_transactions.adoc[leveloffset=+1] // Restore the parent context. ifdef::parent-context[:context: {parent-context}] diff --git a/documentation/asciidoc/topics/code_examples/transactions.js b/documentation/asciidoc/topics/code_examples/transactions.js new file mode 100644 index 0000000..45ea7d2 --- /dev/null +++ b/documentation/asciidoc/topics/code_examples/transactions.js @@ -0,0 +1,109 @@ +var infinispan = require('infinispan'); + +var connected = infinispan.client( + {port: 11222, host: '127.0.0.1'}, + { + authentication: { + enabled: true, + saslMechanism: 'SCRAM-SHA-256', + userName: 'admin', + password: 'changeme' + }, + clientIntelligence: 'BASIC' + } +); + +connected.then(function (adminClient) { + + // Create a cache with transactional configuration. + var txConfig = + '' + + '' + + '' + + ''; + + var create = adminClient.admin.getOrCreateCache('txCache', txConfig); + + return create.then(function() { + return adminClient.disconnect(); + }).then(function() { + + // Connect to the transactional cache. + return infinispan.client( + {port: 11222, host: '127.0.0.1'}, + { + authentication: { + enabled: true, + saslMechanism: 'SCRAM-SHA-256', + userName: 'admin', + password: 'changeme' + }, + cacheName: 'txCache', + topologyUpdates: false, + dataFormat: { + keyType: 'text/plain', + valueType: 'text/plain' + } + } + ); + + }).then(function(client) { + + // Get the transaction manager. + var tm = client.getTransactionManager(); + + // Begin a transaction, put entries, and commit. + var committed = tm.begin().then(function() { + return client.put('key1', 'value1'); + }).then(function() { + return client.put('key2', 'value2'); + }).then(function() { + return tm.commit(); + }); + + // Begin a transaction, put an entry, and roll back. + var rolledBack = committed.then(function() { + return tm.begin(); + }).then(function() { + return client.put('key3', 'should-not-exist'); + }).then(function() { + return tm.rollback(); + }); + + // Read-then-write with version-based conflict detection. + var readWrite = rolledBack.then(function() { + return client.put('counter', '0'); + }).then(function() { + return tm.begin(); + }).then(function() { + return client.get('counter'); + }).then(function(current) { + return client.put('counter', String(parseInt(current) + 1)); + }).then(function() { + return tm.commit(); + }); + + // Remove an entry within a transaction. + var removed = readWrite.then(function() { + return client.put('temp', 'ephemeral'); + }).then(function() { + return tm.begin(); + }).then(function() { + return client.remove('temp'); + }).then(function() { + return tm.commit(); + }); + + // Disconnect from {brandname} Server. + return removed.then(function() { + return client.disconnect(); + }); + + }); + +}).catch(function(error) { + + // Log any errors. + console.log("Got error: " + error.message); + +}); diff --git a/documentation/asciidoc/topics/proc_transactions.adoc b/documentation/asciidoc/topics/proc_transactions.adoc new file mode 100644 index 0000000..944f7fb --- /dev/null +++ b/documentation/asciidoc/topics/proc_transactions.adoc @@ -0,0 +1,77 @@ +[id='transactions_{context}'] += Transactions + +Transactions let you group multiple cache operations into an atomic unit that is either committed or rolled back together. +The {hr_js} client implements client-side transactions using the Hot Rod `PREPARE_TX`, `COMMIT_TX`, and `ROLLBACK_TX` protocol operations. + +== Transaction requirements + +Transactional caches on {brandname} Server must use `NON_XA` transaction mode with `PESSIMISTIC` locking: + +[source,xml,options="nowrap"] +---- + + + + + + + +---- + +== How transactions work + +When you begin a transaction, the client buffers all `put()` and `remove()` operations locally. +Calls to `get()` check the local buffer first, then fall back to the server. + +When you read a key from the server during a transaction, the client records the entry version. +On commit, the server verifies that versions have not changed, providing conflict detection for read-then-write patterns. + +On commit, the client sends all buffered modifications to the server in a single `PREPARE_TX` request. +The {hr_js} client uses one-phase commit for single-cache transactions, so no separate `COMMIT_TX` request is needed. + +On rollback, the client discards all buffered modifications and sends a `ROLLBACK_TX` request to the server. + +== Using transactions + +.Procedure + +. Get the transaction manager from the client. +. Call `begin()` to start a transaction. +. Perform cache operations (`put()`, `get()`, `remove()`). +. Call `commit()` to apply changes atomically, or `rollback()` to discard them. + +[source,javascript,options="nowrap",subs=attributes+] +---- +include::code_examples/transactions.js[] +---- + +.Available transaction operations +[cols="1,3",options="header"] +|=== +|Method |Description + +|`getTransactionManager()` +|Returns the transaction manager for the client. + +|`begin()` +|Starts a new transaction. +All subsequent `put()`, `get()`, and `remove()` calls are associated with this transaction. + +|`commit()` +|Commits the transaction. +Sends all buffered modifications to the server atomically. +Throws an error if the server rejects the transaction due to a version conflict. + +|`rollback()` +|Rolls back the transaction. +Discards all buffered modifications. + +|`isActive()` +|Returns `true` if a transaction is currently in progress. +|=== + +[NOTE] +==== +Connect to the transactional cache with `topologyUpdates: false` to prevent key-based routing, which is not compatible with transaction operations. +==== diff --git a/documentation/asciidoc/topics/ref_client_usage.adoc b/documentation/asciidoc/topics/ref_client_usage.adoc index a4ca91b..1fb46a7 100644 --- a/documentation/asciidoc/topics/ref_client_usage.adoc +++ b/documentation/asciidoc/topics/ref_client_usage.adoc @@ -221,3 +221,10 @@ include::code_examples/queries.js[] ---- See link:{query_docs}[Querying {brandname} caches] for more information. + +== Using transactions + +Transactions let you group multiple cache operations into an atomic unit. +The {hr_js} client buffers operations locally and sends them to the server on commit. + +For detailed information, see link:#transactions_{context}[Transactions]. diff --git a/lib/codec.js b/lib/codec.js index 25c7297..998d5ea 100644 --- a/lib/codec.js +++ b/lib/codec.js @@ -36,6 +36,7 @@ exports.decodeSignedInt = f.lift(doDecodeSignedInt, _.identity); exports.decodeVariableBytes = f.lift(doDecodeVariableBytes, _.identity); exports.decodeShort = f.lift(doDecodeShort, _.identity); + exports.decodeInt = f.lift(doDecodeInt, _.identity); exports.decodeProtobuf = f.lift(doDecodeProtobuf, _.identity); exports.decodeQuery = f.lift(doDecodeQuery,_.identity); @@ -560,6 +561,17 @@ return uncheckedReadShort(bytebuf)(); } + function doDecodeInt(bytebuf) { + if (4 > bytebuf.buf.length - bytebuf.offset) { + logger.tracef('Can not fully read 4 bytes (buffer size is %d, buffer offset %d)', + bytebuf.buf.length, bytebuf.offset); + return undefined; + } + var val = bytebuf.buf.readInt32BE(bytebuf.offset); + bytebuf.offset += 4; + return val; + } + /** * Decode a Protobuf WrappedMessage from the buffer and unwrap it. * @param {Object} bytebuf - The byte buffer to read from. diff --git a/lib/infinispan.js b/lib/infinispan.js index 2706faf..5a0f03a 100644 --- a/lib/infinispan.js +++ b/lib/infinispan.js @@ -18,6 +18,7 @@ var io = require('./io'); var listeners = require('./listeners'); var nearCacheFactory = require('./near-cache'); + var tx = require('./transaction'); var Client = function(addrs, clientOpts) { var logger = u.logger('client'); @@ -39,6 +40,7 @@ var p = protocolResolver(clientOpts['version']); var listen = listeners(p); var nc = clientOpts.nearCache ? nearCacheFactory(clientOpts.nearCache.maxEntries) : null; + var txCtx = null; var TINY = 16, SMALL = 32, MEDIUM = 64, BIG = 128; @@ -327,6 +329,17 @@ * @since 0.3 */ get: function(k) { + if (txCtx) { + var local = txCtx.getLocalValue(k); + if (local.found) return Promise.resolve(local.value); + var ctx = transport.context(SMALL); + logger.debugf('Invoke transactional get(msgId=%d,key=%s)', ctx.id, u.str(k)); + var decoder = p.decodeWithMeta(); + return futureKey(ctx, 0x1B, k, p.encodeKey(k), decoder).then(function(meta) { + txCtx.trackRead(k, meta); + return meta ? meta.value : undefined; + }); + } if (nc) { var cached = nc.get(k); if (cached !== undefined) { @@ -455,6 +468,11 @@ * @since 0.3 */ put: function(k, v, opts) { + if (txCtx) { + logger.debugf('Transactional put(key=%s)', u.str(k)); + txCtx.trackPut(k, v); + return Promise.resolve(undefined); + } if (nc) nc.remove(k); var ctx = transport.context(MEDIUM); logger.debugl(function() { return ['Invoke put(msgId=%d,key=%s,value=%s,opts=%s)', @@ -486,6 +504,11 @@ * @since 0.3 */ remove: function(k, opts) { + if (txCtx) { + logger.debugf('Transactional remove(key=%s)', u.str(k)); + txCtx.trackRemove(k); + return Promise.resolve(true); + } if (nc) nc.remove(k); var ctx = transport.context(SMALL); logger.debugl(function() {return ['Invoke remove(msgId=%d,key=%s,opts=%s)', @@ -1261,6 +1284,73 @@ logger.debugf('Invoke admin.removeSchema(msgId=%d,name=%s)', ctx.id, name); return future(ctx, 0x2B, p.encodeNameParams('@@schemas@delete', {name: name}), p.decodeValue()); } + }, + /** + * Get the transaction manager for this client. + * Transactions buffer put/remove operations locally and send them + * to the server atomically on commit via PREPARE_TX_2. + * During an active transaction, get() reads local writes first, + * falling back to getWithMetadata to capture entry versions for + * conflict detection. + * + * @returns {Object} Transaction manager with begin(), commit(), and rollback() methods. + * @memberof Client# + * @since 0.16 + */ + getTransactionManager: function() { + return { + begin: function() { + if (txCtx) throw new Error('Transaction already active'); + txCtx = new tx.TransactionContext(); + logger.debugf('Transaction started (xid=%s)', + txCtx.xid.globalTxId.toString('hex')); + return Promise.resolve(); + }, + commit: function() { + if (!txCtx) throw new Error('No active transaction'); + var xid = txCtx.xid; + var mods = txCtx.getModifications(); + if (mods.length === 0) { + txCtx = null; + return Promise.resolve(); + } + var ctx = transport.context(BIG); + logger.debugf('Invoke prepareTx(msgId=%d, mods=%d)', ctx.id, mods.length); + var decoder = p.decodeXaResponse(); + return future(ctx, 0x7D, p.encodePrepare(xid, mods, true, 60000), decoder) + .then(function(xaCode) { + txCtx = null; + if (xaCode !== tx.XA_OK && xaCode !== tx.XA_RDONLY) { + throw new Error(`Transaction prepare failed: XA code ${xaCode}`); + } + }) + .catch(function(err) { + txCtx = null; + throw err; + }); + }, + rollback: function() { + if (!txCtx) throw new Error('No active transaction'); + var xid = txCtx.xid; + var mods = txCtx.getModifications(); + if (mods.length === 0) { + txCtx = null; + return Promise.resolve(); + } + var ctx = transport.context(SMALL); + logger.debugf('Invoke rollbackTx(msgId=%d)', ctx.id); + var decoder = p.decodeXaResponse(); + return future(ctx, 0x3F, p.encodeXidOnly(xid), decoder) + .then(function() { txCtx = null; }) + .catch(function(err) { + txCtx = null; + throw err; + }); + }, + isActive: function() { + return txCtx !== null; + } + }; } }; }; diff --git a/lib/protocols.js b/lib/protocols.js index c9e9af9..758cf90 100644 --- a/lib/protocols.js +++ b/lib/protocols.js @@ -577,8 +577,8 @@ return hasPrevious(header.status); }, isEvent: function(header) { - //return ((op(header) >> 4) & 0x06) == 0x06; - return ((header.opCode >> 4) & 0x06) == 0x06; + var op = header.opCode; + return op >= 0x60 && op <= 0x64; }, isError: function(header) { return header.opCode == 0x50; @@ -1428,6 +1428,54 @@ }()); + var DECODE_INT = f.actions([codec.decodeInt()], codec.lastDecoded); + + var TransactionMixin = { + encodeXid: function(xid) { + return [ + codec.encodeSignedInt(xid.formatId), + codec.encodeBytesWithLength(xid.globalTxId), + codec.encodeBytesWithLength(xid.branchQualifier) + ]; + }, + encodePrepare: function(xid, modifications, onePhaseCommit, timeout) { + var outer = this; + return function() { + var steps = outer.encodeXid(xid); + steps.push(codec.encodeUByte(onePhaseCommit ? 1 : 0)); + steps.push(codec.encodeUByte(0)); // not recoverable + steps.push(codec.encodeLong(timeout)); + steps.push(codec.encodeVInt(modifications.length)); + for (var i = 0; i < modifications.length; i++) { + var mod = modifications[i]; + steps.push(outer.encodeMediaKey(mod.key)); + steps.push(codec.encodeUByte(mod.controlByte)); + if (!(mod.controlByte & 0x1) && !(mod.controlByte & 0x2)) { + steps.push(codec.encodeBytes(mod.versionRead)); + } + if (!(mod.controlByte & 0x4)) { + steps.push(codec.encodeUByte(0x77)); + steps.push(outer.encodeMediaValue(mod.value)); + } + } + return steps; + }; + }, + encodeXidOnly: function(xid) { + var outer = this; + return function() { + return outer.encodeXid(xid); + }; + }, + decodeXaResponse: function() { + return function(header, bytebuf) { + var code = DECODE_INT(bytebuf); + if (!f.existy(code)) return {continue: false}; + return {result: code, continue: true}; + }; + } + }; + /** * Constructs a Hot Rod protocol instance for the given version. * @param {number} v Protocol version number. @@ -1516,13 +1564,7 @@ , Ping29Mixin , ProtostreamType , ProtobufRoot - // TODO 2.6 new ops: getStream and putStream - // TODO 2.6 add listener change: listener event interests - // TODO 2.7 new ops: prepare, commit and rollback - // TODO 2.7 new ops: counter operations - // TODO 2.7 new events: counter events - // TODO 2.8 listener events: can come from any connection - // TODO 2.8 header change: media types + , TransactionMixin ); _.extend(Protocol30.prototype @@ -1538,6 +1580,7 @@ , Ping30Mixin , ProtostreamType , ProtobufRoot + , TransactionMixin ); _.extend(Protocol31.prototype @@ -1554,7 +1597,7 @@ , CounterMixin , ProtostreamType , ProtobufRoot - // TODO 3.1 new ops: bloom filter near-cache + , TransactionMixin ); _.extend(Protocol40.prototype @@ -1573,6 +1616,7 @@ , CounterMixin , ProtostreamType , ProtobufRoot + , TransactionMixin ); _.extend(Protocol41.prototype @@ -1591,7 +1635,7 @@ , CounterMixin , ProtostreamType , ProtobufRoot - // TODO 4.1 new ops: chunked streaming (GetStreamStart/Next/End, PutStreamStart/Next/End) + , TransactionMixin ); exports.version22 = function(clientOpts) { diff --git a/lib/transaction.js b/lib/transaction.js new file mode 100644 index 0000000..7e412cc --- /dev/null +++ b/lib/transaction.js @@ -0,0 +1,123 @@ +'use strict'; + +(function() { + + var crypto = require('crypto'); + + var FORMAT_ID = 0x48525458; + var NOT_READ = 0x1, NON_EXISTING = 0x2, REMOVE_OP = 0x4; + var XA_OK = 0, XA_RDONLY = 3; + + function generateXid() { + return { + formatId: FORMAT_ID, + globalTxId: crypto.randomBytes(16), + branchQualifier: crypto.randomBytes(16) + }; + } + + function TransactionContext() { + this.xid = generateXid(); + // key (string) → { value, removed (bool), versionRead (Buffer|null), wasRead (bool), existed (bool) } + this.entries = new Map(); + this.active = true; + } + + TransactionContext.prototype.trackPut = function(key, value) { + var entry = this.entries.get(key); + if (entry) { + entry.value = value; + entry.removed = false; + } else { + this.entries.set(key, { + value: value, + removed: false, + versionRead: null, + wasRead: false, + existed: false + }); + } + }; + + TransactionContext.prototype.trackRemove = function(key) { + var entry = this.entries.get(key); + if (entry) { + entry.removed = true; + entry.value = undefined; + } else { + this.entries.set(key, { + value: undefined, + removed: true, + versionRead: null, + wasRead: false, + existed: false + }); + } + }; + + TransactionContext.prototype.trackRead = function(key, meta) { + var entry = this.entries.get(key); + var version = meta ? meta.version : null; + var existed = meta !== undefined; + if (entry) { + if (!entry.wasRead) { + entry.wasRead = true; + entry.versionRead = version; + entry.existed = existed; + } + } else { + this.entries.set(key, { + value: meta ? meta.value : undefined, + removed: false, + versionRead: version, + wasRead: true, + existed: existed + }); + } + }; + + TransactionContext.prototype.getLocalValue = function(key) { + var entry = this.entries.get(key); + if (!entry) return { found: false }; + if (entry.removed) return { found: true, value: undefined }; + if (entry.value !== undefined) return { found: true, value: entry.value }; + return { found: false }; + }; + + TransactionContext.prototype.computeControlByte = function(entry) { + var control = 0; + if (!entry.wasRead) { + control |= NOT_READ; + } else if (!entry.existed) { + control |= NON_EXISTING; + } + if (entry.removed) { + control |= REMOVE_OP; + } + return control; + }; + + TransactionContext.prototype.getModifications = function() { + var mods = []; + this.entries.forEach(function(entry, key) { + var control = this.computeControlByte(entry); + mods.push({ + key: key, + controlByte: control, + versionRead: entry.versionRead, + value: entry.value + }); + }.bind(this)); + return mods; + }; + + exports.TransactionContext = TransactionContext; + exports.generateXid = generateXid; + exports.FORMAT_ID = FORMAT_ID; + exports.NOT_READ = NOT_READ; + exports.NON_EXISTING = NON_EXISTING; + exports.REMOVE_OP = REMOVE_OP; + exports.XA_OK = XA_OK; + exports.XA_RDONLY = XA_RDONLY; + +}.call(this)); diff --git a/package.json b/package.json index 2a30127..b075a0b 100644 --- a/package.json +++ b/package.json @@ -9,8 +9,7 @@ }, "scripts": { "lint": "eslint --ignore-path .gitignore lib spec index.js", - "test": "jasmine", - "test:docker": "node scripts/docker-test.js", + "test": "node scripts/docker-test.js", "docker:up": "docker compose -p ispn-test up -d --wait && docker compose -p ispn-test --profile failover create server-failover-one server-failover-two server-failover-three", "docker:down": "docker compose -p ispn-test --profile failover down --remove-orphans", "ssl:generate": "node scripts/make-ssl.js", diff --git a/scripts/docker-test.js b/scripts/docker-test.js index 60ff30b..23ef7dc 100755 --- a/scripts/docker-test.js +++ b/scripts/docker-test.js @@ -65,8 +65,6 @@ const containers = { for (const [envVar, container] of Object.entries(containers)) { process.env[envVar] = getContainerIp(container); } -process.env.ISPN_DOCKER = 'true'; - console.log('Container IPs:'); console.log(` local: ${process.env.ISPN_LOCAL_HOST}`); console.log(` cluster: ${process.env.ISPN_CLUSTER1_HOST}, ${process.env.ISPN_CLUSTER2_HOST}, ${process.env.ISPN_CLUSTER3_HOST}`); diff --git a/spec/configs/docker/infinispan-clustered.xml b/spec/configs/docker/infinispan-clustered.xml index 44da4f0..f436828 100644 --- a/spec/configs/docker/infinispan-clustered.xml +++ b/spec/configs/docker/infinispan-clustered.xml @@ -48,7 +48,7 @@ - diff --git a/spec/configs/docker/infinispan-xsite-EARTH.xml b/spec/configs/docker/infinispan-xsite-EARTH.xml index 18709a2..1f751d3 100644 --- a/spec/configs/docker/infinispan-xsite-EARTH.xml +++ b/spec/configs/docker/infinispan-xsite-EARTH.xml @@ -76,7 +76,7 @@ - + diff --git a/spec/configs/docker/infinispan-xsite-MOON.xml b/spec/configs/docker/infinispan-xsite-MOON.xml index 7c58446..20001c4 100644 --- a/spec/configs/docker/infinispan-xsite-MOON.xml +++ b/spec/configs/docker/infinispan-xsite-MOON.xml @@ -78,7 +78,7 @@ - + diff --git a/spec/infinispan_ssl_spec.js b/spec/infinispan_ssl_spec.js index 40520e5..fb575e0 100644 --- a/spec/infinispan_ssl_spec.js +++ b/spec/infinispan_ssl_spec.js @@ -112,7 +112,7 @@ describe('Infinispan TLS/SSL client', function() { enabled: true, secureProtocol: 'TLS_client_method', trustCerts: ['out/ssl/ca/ca.pem'], - sniHostName: t.isDocker ? 'localhost' : undefined + sniHostName: 'localhost' }, authentication: { enabled: true, @@ -136,7 +136,7 @@ describe('Infinispan TLS/SSL client', function() { path: 'out/ssl/client/client.p12', passphrase: 'secret' }, - sniHostName: t.isDocker ? 'localhost' : undefined + sniHostName: 'localhost' }, authentication: { enabled: true, @@ -161,7 +161,7 @@ describe('Infinispan TLS/SSL client', function() { passphrase: 'secret', cert: 'out/ssl/client/client.pem' }, - sniHostName: t.isDocker ? 'localhost' : undefined + sniHostName: 'localhost' }, authentication: { enabled: true, diff --git a/spec/utils/testing.js b/spec/utils/testing.js index 3cfe670..5af5b0c 100644 --- a/spec/utils/testing.js +++ b/spec/utils/testing.js @@ -13,45 +13,25 @@ var ispn = require('../../lib/infinispan'); var u = require('../../lib/utils'); var protocols = require('../../lib/protocols'); -exports.serverDirName = 'infinispan-server'; - -exports.isDocker = process.env.ISPN_DOCKER === 'true'; - exports.authOpts = { authentication: { enabled: true, - saslMechanism: 'PLAIN', + saslMechanism: 'SCRAM-SHA-256', userName: 'admin', password: 'pass' } }; -if (exports.isDocker) { - // Docker mode: all containers listen on port 11222, IPs detected via env vars - exports.local = {port: 11222, host: process.env.ISPN_LOCAL_HOST}; - exports.cluster1 = {port: 11222, host: process.env.ISPN_CLUSTER1_HOST}; - exports.cluster2 = {port: 11222, host: process.env.ISPN_CLUSTER2_HOST}; - exports.cluster3 = {port: 11222, host: process.env.ISPN_CLUSTER3_HOST}; - exports.failover1 = {port: 11222, host: process.env.ISPN_FAILOVER1_HOST}; - exports.failover2 = {port: 11222, host: process.env.ISPN_FAILOVER2_HOST}; - exports.failover3 = {port: 11222, host: process.env.ISPN_FAILOVER3_HOST}; - exports.ssl = {port: 11232, host: process.env.ISPN_SSL_HOST}; - exports.earth1 = {port: 11222, host: process.env.ISPN_EARTH_HOST}; - exports.moon1 = {port: 11222, host: process.env.ISPN_MOON_HOST}; -} else { - // Local mode: servers on localhost with different ports - exports.local = {port: 11222, host: '127.0.0.1'}; - exports.cluster1 = {port: 11322, host: '127.0.0.1'}; - exports.cluster2 = {port: 11332, host: '127.0.0.1'}; - exports.cluster3 = {port: 11342, host: '127.0.0.1'}; - exports.failover1 = {port: 11422, host: '127.0.0.1'}; - exports.failover2 = {port: 11432, host: '127.0.0.1'}; - exports.failover3 = {port: 11442, host: '127.0.0.1'}; - // SSL invocations directed to 'localhost' for TLS server name matching - exports.ssl = {port: 11232, host: 'localhost'}; - exports.earth1 = {port: 11522, host: '127.0.0.1'}; - exports.moon1 = {port: 11532, host: '127.0.0.1'}; -} +exports.local = {port: 11222, host: process.env.ISPN_LOCAL_HOST}; +exports.cluster1 = {port: 11222, host: process.env.ISPN_CLUSTER1_HOST}; +exports.cluster2 = {port: 11222, host: process.env.ISPN_CLUSTER2_HOST}; +exports.cluster3 = {port: 11222, host: process.env.ISPN_CLUSTER3_HOST}; +exports.failover1 = {port: 11222, host: process.env.ISPN_FAILOVER1_HOST}; +exports.failover2 = {port: 11222, host: process.env.ISPN_FAILOVER2_HOST}; +exports.failover3 = {port: 11222, host: process.env.ISPN_FAILOVER3_HOST}; +exports.ssl = {port: 11232, host: process.env.ISPN_SSL_HOST}; +exports.earth1 = {port: 11222, host: process.env.ISPN_EARTH_HOST}; +exports.moon1 = {port: 11222, host: process.env.ISPN_MOON_HOST}; exports.cluster = [exports.cluster1, exports.cluster2, exports.cluster3]; exports.failoverConfig = 'infinispan-clustered.xml'; exports.failoverMCastAddr = '234.99.64.14'; @@ -701,22 +681,7 @@ function containerForAddr(addrOrPort) { exports.stopClusterNode = function(addrOrPort, waitStop) { return function() { - if (exports.isDocker) { - return stopDockerContainer(addrOrPort, waitStop); - } - var addr = resolveAddr(addrOrPort); - var opUrl = '/server?action=stop'; - - if (waitStop) { - return invokeDmrHttpGet('POST', opUrl, addr).then(function() { - return waitUntilStopped(addr); - }).catch(function (err) { - console.log(`Error stopping ${ err}`); - return Promise.resolve(`unable to stop server at ${addr.host}:${addr.port}`); - }); - } - - return invokeDmrHttpGet('POST', opUrl, addr); + return stopDockerContainer(addrOrPort, waitStop); }; }; @@ -742,46 +707,6 @@ function stopDockerContainer(addrOrPort, waitStop) { }); } -function waitUntilStopped(addrOrPort) { - return waitUntil( - function(resp) { expect(resp).toEqual(0); }, - function(resp) { return _.isEqual(resp, 0); }, - getServerStatus(addrOrPort) - ); -} - -function getServerStatus(addrOrPort) { - if (exports.isDocker) { - var container = containerForAddr(addrOrPort); - return function() { - return new Promise(function(fulfil) { - var exec = require('child_process').exec; - exec(`docker inspect -f '{{.State.Running}}' ${container} 2>/dev/null`, function(err, stdout) { - fulfil(stdout && stdout.trim() === 'true' ? 1 : 0); - }); - }); - }; - } - var addr = resolveAddr(addrOrPort); - return function() { - return new Promise(function(fulfil) { - var spawn = require('child_process').spawn; - var child = spawn('fuser', [`${addr.port}/tcp`]); - var count = 0; - - child.stdout.on('data', function(chunk) { - chunk.toString().split('\n').forEach(function() { - count++; - }); - }); - - child.stdout.on('end', function() { - fulfil(count); - }); - }); - }; -} - exports.stopAndWaitView = function(nodeStop, expectNumMembers, nodeView) { return function() { return exports.stopClusterNode(nodeStop, true)() @@ -812,48 +737,12 @@ exports.waitUntilView = function(expectNumMembers, addrOrPort) { ); }; exports.launchClusterNodeAndWaitView = function(nodeName, config, addrOrPort, mcastAddr, expectNumMembers, client) { - if (exports.isDocker) { - return launchDockerNode(addrOrPort).then(function() { - logger.debugf(`wait until view ${ expectNumMembers}`); - return exports.waitUntilView(expectNumMembers, addrOrPort); - }).then(function() { - logger.debugf('return client'); - return client; - }); - } - var addr = resolveAddr(addrOrPort); - return killProcessOnPort(addr.port).then(function() { - return new Promise(function (fulfill) { - var spawn = require('child_process').spawn; - var path = require('path'); - - var serverPath = path.resolve(`server/${ exports.serverDirName }/`); - var standaloneShPath = `${serverPath }/bin/server.sh`; - var cmd = spawn( - standaloneShPath, - ['-c', config, '-p', addr.port, '-s',`${serverPath }/${ nodeName}`, - `-Djgroups.mcast_addr=${ mcastAddr}`, - `-Dinfinispan.node.name=${ nodeName}`, - '-Djgroups.join_timeout=1000', - '-Djgroups.bind.address=127.0.0.1' - ]); - - cmd.stderr.on('data', function (data) { - logger.debugf('Stderr [%s]: %s', nodeName, data); - }); - - cmd.on('exit', function (code) { - logger.debugf('Child process exited with code %d', code); - }); - - fulfill(); - }); + return launchDockerNode(addrOrPort).then(function() { + logger.debugf(`wait until view ${ expectNumMembers}`); + return exports.waitUntilView(expectNumMembers, addrOrPort); }).then(function() { - logger.debugf(`wait until view ${ expectNumMembers}`); - return exports.waitUntilView(expectNumMembers, addrOrPort); - }).then(function() { - logger.debugf('return client'); - return client; + logger.debugf('return client'); + return client; }); }; @@ -895,15 +784,6 @@ function launchDockerNode(addrOrPort) { }); } -function killProcessOnPort(port) { - return new Promise(function(resolve) { - var exec = require('child_process').exec; - exec(`fuser -k ${port}/tcp 2>/dev/null`, function() { - setTimeout(resolve, 500); - }); - }); -} - function waitUntil(expectF, cond, op) { var now = new Date().getTime(); var MAX_TIMEOUT = 60000; @@ -1036,8 +916,6 @@ function digestRequest(method, url, username, password) { function restHostPort(addrOrPort) { var addr = resolveAddr(addrOrPort); - if (!exports.isDocker) return `localhost:${addr.port}`; - // In Docker, REST is on port 11222 at the container's IP return `${addr.host}:11222`; }