From 9d64e22656dad797aa508e87dd716d3984c747f0 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Mon, 21 Mar 2016 13:51:30 -0400 Subject: [PATCH 01/12] Configurable parallel jobs --- lib/index.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/index.js b/lib/index.js index 95951f9..d2da90b 100644 --- a/lib/index.js +++ b/lib/index.js @@ -56,11 +56,12 @@ function Runner(emitter, config) { logger: console, processJob: core.process, pluginDir: path.join(__dirname, '../node_modules'), - dataDir: process.env.STRIDER_CLONE_DEST || dotStrider + dataDir: process.env.STRIDER_CLONE_DEST || dotStrider, + concurrentJobs: parseInt(process.env.CONCURRENT_JOBS || '1', 10) || 1, }, config) this.emitter = emitter this.log = this.config.logger.log - this.queue = async.queue(this.processJob.bind(this), 1) + this.queue = async.queue(this.processJob.bind(this), this.config.concurrentJobs) this.io = this.config.io this.callbackMap = {} this.hooks = [] @@ -295,7 +296,9 @@ Runner.prototype = { // Keep around N most recent build directories. // Default is 0, ie wipe at start of each run. // Later, this can be configurable in the UI. - keeper({baseDir: path.join(this.config.dataDir, "data"), count: 0}, function(err) { + keeper({baseDir: path.join(this.config.dataDir, "data"), count: this.config.concurrentJobs - 1}, function(err) { + if (err) throw err; + initJobDirs(self.config.dataDir, job, cache.base, jobDirsReady) }) @@ -373,4 +376,3 @@ Runner.prototype = { }; } }; - From 61ca92a486e1056281857335a6a5b3ba031a0a80 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Mon, 21 Mar 2016 13:55:04 -0400 Subject: [PATCH 02/12] Documentation in the README. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index be8bfd5..99620ff 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ io(new EventEmitter): the channel of internal communication with the job work processJob(core.process):function to run a job. (task, config, ondone) pluginDirs: the directories in which to look for plugins dataDir($HOME/.strider): the directory in which to clone/test/etc +concurrentJobs(1): maximum number of jobs to execute at once ``` ### Events From a530905dd8faa9c3c8c9afe8a4525ac0b2c52e5f Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 14:29:53 -0400 Subject: [PATCH 03/12] Initial cut of a specialized job queue --- lib/jobqueue.js | 92 +++++++++++++++++++++++++++++++++++++++++ test/test_jobqueue.js | 96 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 lib/jobqueue.js create mode 100644 test/test_jobqueue.js diff --git a/lib/jobqueue.js b/lib/jobqueue.js new file mode 100644 index 0000000..5b436f8 --- /dev/null +++ b/lib/jobqueue.js @@ -0,0 +1,92 @@ +module.exports = JobQueue + +function JobQueue(handler, concurrency) { + this.concurrency = concurrency + this.handler = handler + this.tasks = [] + this.active = {} + this.drainCallback = null +} + +JobQueue.prototype = { + // public api + + // Add a job to the end of the queue. If the queue is not currently saturated, immediately + // schedule a task to handle the new job. If a callback is provided, call it when this job's task + // completes. + push: function (job, config, callback) { + var task = { + job: job, + config: config, + callback: callback || function () {} + } + + task.id = task.job._id + + this.tasks.push(task) + + // Defer task execution to the next event loop tick to ensure that the push() function's + // callback is *always* invoked asynchronously. + // http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony + process.nextTick(this.drain.bind(this)) + }, + + // Launch the asynchronous handler function for each eligible waiting task until the queue is + // saturated. + drain: function () { + var self = this + + // See how much capacity we have left to fill. + var launchCount = this.concurrency - Object.keys(this.active).length + + // Identify up to launchCount eligible tasks, giving priority to those earlier in the queue. + var offset = 0 + var launchTasks = [] + while (launchTasks.length < launchCount && this.tasks.length > offset) { + var task = this.tasks[offset] + + // TODO determine task eligibility + this.tasks.splice(offset, 1) + launchTasks.push(task) + } + + // Create a task completion callback. Remove the task from the active set, invoke the tasks' + // push() callback, then drain() again to see if another task is ready to run. + var makeTaskHandler = function (task) { + return function (err) { + delete self.active[task.id] + + task.callback(err) + + // Defer the next drain() call again in case the task's callback was synchronous. + process.nextTick(self.drain.bind(self)) + } + } + + // Launch the queue handler for each chosen task. + for (var i = 0; i < launchTasks.length; i++) { + var each = launchTasks[i] + + this.active[each.id] = each + + this.handler(each.job, each.config, makeTaskHandler(each)) + } + + // Fire and unset the drain callback if one has been registered. + if (this.drainCallback) { + var lastCallback = this.drainCallback + this.drainCallback = null + lastCallback() + } + }, + + // Return true if "id" corresponds to the job ID of an active job. + isActive: function (id) { + return id in this.active + }, + + // Fire a callback the next time that a drain() is executed. + onNextDrain: function (callback) { + this.drainCallback = callback + } +} diff --git a/test/test_jobqueue.js b/test/test_jobqueue.js new file mode 100644 index 0000000..ac3078f --- /dev/null +++ b/test/test_jobqueue.js @@ -0,0 +1,96 @@ + +var expect = require('expect.js') + , async = require('async') + , JobQueue = require('../lib/jobqueue.js'); + +describe('JobQueue', function () { + var q, handlers + + beforeEach(function () { + handlers = {} + }) + + // Configure a handler to be updated when handlerDispatch executes a job with an expected ID. + function expectJobs() { + function defaultFinish() { + throw new Error('Job ' + jid + ' was never handled') + } + + for (var i = 0; i < arguments.length; i++) { + var jid = arguments[i] + + handlers[jid] = { + wasCalled: false, + finish: defaultFinish + } + } + } + + // JobQueue handler function that manipulates handler objects set up in advance with expectJob. + function handlerDispatch(job, config, cb) { + var jid = job._id + var handler = handlers[jid] + if (!handler) { + return cb(new Error('Unexpected job id ' + jid)) + } + + handler.wasCalled = true + handler.finish = function () { + cb(null) + } + } + + describe('with concurrency 1', function () { + beforeEach(function () { + q = new JobQueue(handlerDispatch, 1) + }) + + it('executes on push on next tick when unsaturated', function (done) { + expectJobs(1) + + q.push({ _id: 1 }) + + q.onNextDrain(function () { + expect(q.isActive(1)).to.be(true) + expect(handlers[1].wasCalled).to.be(true) + + handlers[1].finish() + done() + }) + }) + + it('waits for an available task slot when saturated', function (done) { + expectJobs(1, 2) + + async.series([ + function (cb) { + q.onNextDrain(cb) + + q.push({ _id: 1 }) + }, + function (cb) { + q.onNextDrain(cb) + + expect(handlers[1].wasCalled).to.be(true) + + // The queue is now saturated. + q.push({ _id: 2 }) + }, + function (cb) { + q.onNextDrain(cb) + + expect(handlers[2].wasCalled).to.be(false) + + // Finishing job 1's handler causes a slot to open, so job 2 can now run. + handlers[1].finish() + }, + function (cb) { + expect(handlers[2].wasCalled).to.be(true) + + handlers[2].finish() + cb() + } + ], done) + }) + }) +}) From 3c044350dab7a8356fcbf2ffe3d3b0471efb3e93 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 15:11:06 -0400 Subject: [PATCH 04/12] Test the concurrency 2 case. --- test/test_jobqueue.js | 85 +++++++++++++++++++++++++++++++++---------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/test/test_jobqueue.js b/test/test_jobqueue.js index ac3078f..b867f6c 100644 --- a/test/test_jobqueue.js +++ b/test/test_jobqueue.js @@ -26,6 +26,26 @@ describe('JobQueue', function () { } } + // Execute a function after each queue drain. Each function should finish with an action that + // causes a drain to occur (push a new task or complete an active task). + function onEachDrain(steps, done) { + var wrappedSteps = [] + + var makeStep = function (arg) { + return function (cb) { + q.onNextDrain(cb) + + arg() + } + } + + for (var i = 0; i < steps.length; i++) { + wrappedSteps.push(makeStep(steps[i])) + } + + async.series(wrappedSteps, done) + } + // JobQueue handler function that manipulates handler objects set up in advance with expectJob. function handlerDispatch(job, config, cb) { var jid = job._id @@ -62,35 +82,62 @@ describe('JobQueue', function () { it('waits for an available task slot when saturated', function (done) { expectJobs(1, 2) - async.series([ - function (cb) { - q.onNextDrain(cb) - + onEachDrain([ + function () { q.push({ _id: 1 }) }, - function (cb) { - q.onNextDrain(cb) - + function () { expect(handlers[1].wasCalled).to.be(true) - - // The queue is now saturated. q.push({ _id: 2 }) }, - function (cb) { - q.onNextDrain(cb) - + function () { expect(handlers[2].wasCalled).to.be(false) - - // Finishing job 1's handler causes a slot to open, so job 2 can now run. handlers[1].finish() + } + ], function () { + expect(handlers[2].wasCalled).to.be(true) + + handlers[2].finish() + done() + }); + }) + }) + + describe('with concurrency 2', function () { + beforeEach(function () { + q = new JobQueue(handlerDispatch, 2) + }) + + it('executes the first two tasks immediately, then waits for task completion', function (done) { + expectJobs(1, 2, 3) + + onEachDrain([ + function () { + q.push({ _id: 1 }) + }, + function () { + expect(handlers[1].wasCalled).to.be(true) + q.push({ _id: 2 }) }, - function (cb) { + function () { expect(handlers[2].wasCalled).to.be(true) - + q.push({ _id: 3 }) + }, + function () { + // The queue was saturated when job 3 was added. It should not have run yet + expect(handlers[3].wasCalled).to.be(false) handlers[2].finish() - cb() + }, + function () { + expect(handlers[3].wasCalled).to.be(true) + + // Call the handlers for jobs 1 and 3 to be tidy. + handlers[1].finish() } - ], done) - }) + ], function () { + handlers[3].finish() + done() + }) + }); }) }) From 22f1d3c5a395a5b3bf464332fa013ed90406bfa3 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 15:52:16 -0400 Subject: [PATCH 05/12] Test for jobs on the same project/branch. --- test/test_jobqueue.js | 49 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/test/test_jobqueue.js b/test/test_jobqueue.js index b867f6c..74565b0 100644 --- a/test/test_jobqueue.js +++ b/test/test_jobqueue.js @@ -10,6 +10,14 @@ describe('JobQueue', function () { handlers = {} }) + function makeJob(jid, project, branch) { + return { + _id: jid, + project: project, + ref: { branch: branch } + } + } + // Configure a handler to be updated when handlerDispatch executes a job with an expected ID. function expectJobs() { function defaultFinish() { @@ -68,7 +76,7 @@ describe('JobQueue', function () { it('executes on push on next tick when unsaturated', function (done) { expectJobs(1) - q.push({ _id: 1 }) + q.push(makeJob(1, 'foo/bar', 'master')) q.onNextDrain(function () { expect(q.isActive(1)).to.be(true) @@ -84,11 +92,11 @@ describe('JobQueue', function () { onEachDrain([ function () { - q.push({ _id: 1 }) + q.push(makeJob(1, 'foo/bar1', 'master')) }, function () { expect(handlers[1].wasCalled).to.be(true) - q.push({ _id: 2 }) + q.push(makeJob(2, 'foo/bar2', 'master')) }, function () { expect(handlers[2].wasCalled).to.be(false) @@ -113,15 +121,15 @@ describe('JobQueue', function () { onEachDrain([ function () { - q.push({ _id: 1 }) + q.push(makeJob(1, 'foo/bar1', 'master')) }, function () { expect(handlers[1].wasCalled).to.be(true) - q.push({ _id: 2 }) + q.push(makeJob(2, 'foo/bar2', 'master')) }, function () { expect(handlers[2].wasCalled).to.be(true) - q.push({ _id: 3 }) + q.push(makeJob(3, 'foo/bar3', 'master')) }, function () { // The queue was saturated when job 3 was added. It should not have run yet @@ -138,6 +146,33 @@ describe('JobQueue', function () { handlers[3].finish() done() }) - }); + }) + + it('prevents scheduling concurrent jobs on the same branch', function (done) { + expectJobs(1, 2) + + onEachDrain([ + function () { + q.push(makeJob(1, 'foo/bar', 'master')) + }, + function () { + expect(handlers[1].wasCalled).to.be(true) + + q.push(makeJob(2, 'foo/bar', 'master')) + }, + function () { + // Even though the queue is unsaturated, job 2 should not be handled yet, because it's + // for the same project and branch as a running job. + expect(handlers[2].wasCalled).to.be(false) + + handlers[1].finish() + }, + function () { + expect(handlers[2].wasCalled).to.be(true) + + handlers[2].finish() + } + ], done) + }) }) }) From b8140222a8b087837aa578519547173d260a433b Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 16:04:25 -0400 Subject: [PATCH 06/12] Track active jobs by key. --- lib/jobqueue.js | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/jobqueue.js b/lib/jobqueue.js index 5b436f8..04c1d16 100644 --- a/lib/jobqueue.js +++ b/lib/jobqueue.js @@ -4,7 +4,10 @@ function JobQueue(handler, concurrency) { this.concurrency = concurrency this.handler = handler this.tasks = [] + this.active = {} + this.activeKeys = {} + this.drainCallback = null } @@ -23,6 +26,10 @@ JobQueue.prototype = { task.id = task.job._id + // Derive a job key from the tuple of (project, ref). + // ref has an arbitrary structure, so we'll compare its JSON-serialized form. + task.key = task.job.project + JSON.stringify(task.ref) + this.tasks.push(task) // Defer task execution to the next event loop tick to ensure that the push() function's @@ -45,9 +52,14 @@ JobQueue.prototype = { while (launchTasks.length < launchCount && this.tasks.length > offset) { var task = this.tasks[offset] - // TODO determine task eligibility - this.tasks.splice(offset, 1) - launchTasks.push(task) + if (task.key in this.activeKeys) { + // This task cannot run right now, so skip it. + offset += 1 + } else { + // This task is eligible to run. Remove it from the queue and prepare it to launch. + this.tasks.splice(offset, 1) + launchTasks.push(task) + } } // Create a task completion callback. Remove the task from the active set, invoke the tasks' @@ -55,6 +67,7 @@ JobQueue.prototype = { var makeTaskHandler = function (task) { return function (err) { delete self.active[task.id] + delete self.activeKeys[task.key] task.callback(err) @@ -68,6 +81,7 @@ JobQueue.prototype = { var each = launchTasks[i] this.active[each.id] = each + this.activeKeys[each.key] = true this.handler(each.job, each.config, makeTaskHandler(each)) } From 8dd8bbe24af454d8b43f2e2b93b41ca8636c7e68 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 16:09:11 -0400 Subject: [PATCH 07/12] active and activeKeys were redundant --- lib/jobqueue.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/jobqueue.js b/lib/jobqueue.js index 04c1d16..c99b1ca 100644 --- a/lib/jobqueue.js +++ b/lib/jobqueue.js @@ -6,7 +6,6 @@ function JobQueue(handler, concurrency) { this.tasks = [] this.active = {} - this.activeKeys = {} this.drainCallback = null } @@ -52,7 +51,7 @@ JobQueue.prototype = { while (launchTasks.length < launchCount && this.tasks.length > offset) { var task = this.tasks[offset] - if (task.key in this.activeKeys) { + if (task.key in this.active) { // This task cannot run right now, so skip it. offset += 1 } else { @@ -66,8 +65,7 @@ JobQueue.prototype = { // push() callback, then drain() again to see if another task is ready to run. var makeTaskHandler = function (task) { return function (err) { - delete self.active[task.id] - delete self.activeKeys[task.key] + delete self.active[task.key] task.callback(err) @@ -80,8 +78,7 @@ JobQueue.prototype = { for (var i = 0; i < launchTasks.length; i++) { var each = launchTasks[i] - this.active[each.id] = each - this.activeKeys[each.key] = true + this.active[each.key] = each this.handler(each.job, each.config, makeTaskHandler(each)) } @@ -96,7 +93,12 @@ JobQueue.prototype = { // Return true if "id" corresponds to the job ID of an active job. isActive: function (id) { - return id in this.active + for (var key in this.active) { + if (this.active.hasOwnProperty(key) && this.active[key].id === id) { + return true + } + } + return false }, // Fire a callback the next time that a drain() is executed. From 367ec03ae13d619f31d8415d8b91270b88992c00 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 16:13:52 -0400 Subject: [PATCH 08/12] Use a stable JSON stringifier. --- lib/jobqueue.js | 4 +++- package.json | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/jobqueue.js b/lib/jobqueue.js index c99b1ca..e731fb1 100644 --- a/lib/jobqueue.js +++ b/lib/jobqueue.js @@ -1,3 +1,5 @@ +var stringify = require('json-stable-stringify') + module.exports = JobQueue function JobQueue(handler, concurrency) { @@ -27,7 +29,7 @@ JobQueue.prototype = { // Derive a job key from the tuple of (project, ref). // ref has an arbitrary structure, so we'll compare its JSON-serialized form. - task.key = task.job.project + JSON.stringify(task.ref) + task.key = task.job.project + stringify(task.ref) this.tasks.push(task) diff --git a/package.json b/package.json index e84b631..73e835b 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "strider-runner-core": "~2.0.0", "strider-extension-loader": "~0.4.3", "fs-extra": "~0.8.1", - "dirkeeper": "~0.2.0" + "dirkeeper": "~0.2.0", + "json-stable-stringify": "~1.0.1" }, "devDependencies": { "mocha": "^1.21.1", From e74dac6408d483ad0f82983853d1451300048955 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Tue, 22 Mar 2016 16:27:50 -0400 Subject: [PATCH 09/12] Use the parallel job queue. --- lib/index.js | 13 ++++++------- lib/jobqueue.js | 5 +++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/index.js b/lib/index.js index d2da90b..fb1385d 100644 --- a/lib/index.js +++ b/lib/index.js @@ -12,6 +12,7 @@ var fs = require('fs-extra') , cachier = require('./cachier') , keeper = require('dirkeeper') , JobData = require('./jobdata') + , JobQueue = require('./jobqueue') // timeout for callbacks. Helps to kill misbehaving plugins, etc function t(time, done) { @@ -61,7 +62,7 @@ function Runner(emitter, config) { }, config) this.emitter = emitter this.log = this.config.logger.log - this.queue = async.queue(this.processJob.bind(this), this.config.concurrentJobs) + this.queue = new JobQueue(this.processJob.bind(this), this.config.concurrentJobs) this.io = this.config.io this.callbackMap = {} this.hooks = [] @@ -180,13 +181,13 @@ Runner.prototype = { this.jobdata.add(job) this.log('[runner:' + this.id + '] Queued new job. Project: ' + job.project.name + ' Job ID: ' + job._id) this.emitter.emit('browser.update', job.project.name, 'job.status.queued', [job._id, now]) - this.queue.push({job: job, config: config}) + this.queue.push(job, config) }, cancelJob: function (id) { var jobdata for (var i=0; i Date: Wed, 23 Mar 2016 08:46:29 -0400 Subject: [PATCH 10/12] Just use ref.branch if it's available. --- lib/jobqueue.js | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/jobqueue.js b/lib/jobqueue.js index c1dead4..19539a8 100644 --- a/lib/jobqueue.js +++ b/lib/jobqueue.js @@ -28,8 +28,16 @@ JobQueue.prototype = { task.id = task.job._id // Derive a job key from the tuple of (project, ref). - // ref has an arbitrary structure, so we'll compare its JSON-serialized form. - task.key = task.job.project + stringify(task.ref) + // ref technically has an arbitrary internal structure. If it isn't the usual { branch: "foo" }, + // we'll use a stable JSON-serialized form instead. + var suffix = '' + if ('branch' in task.job.ref) { + suffix = task.job.ref.branch + } else { + suffix = stringify(task.ref) + } + + task.key = task.job.project + suffix this.tasks.push(task) From da22b9aeec172c0b2d33bb265b1c1e8a739a9352 Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Wed, 23 Mar 2016 09:36:09 -0400 Subject: [PATCH 11/12] Store build data in per-branch subdirectories. --- lib/index.js | 29 +++++++++++++++++------------ lib/jobqueue.js | 15 +++------------ lib/utils.js | 20 ++++++++++++++++++-- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/lib/index.js b/lib/index.js index fb1385d..b3d9181 100644 --- a/lib/index.js +++ b/lib/index.js @@ -13,6 +13,7 @@ var fs = require('fs-extra') , keeper = require('dirkeeper') , JobData = require('./jobdata') , JobQueue = require('./jobqueue') + , branchFromJob = require('./utils').branchFromJob // timeout for callbacks. Helps to kill misbehaving plugins, etc function t(time, done) { @@ -70,16 +71,15 @@ function Runner(emitter, config) { this.attach() } -// base: the base directory where all jobs data is stored +// base: the base directory where all jobs data for this project and branch is stored // the job object. // done(err, {base:, data:, cache:}) -function initJobDirs(base, job, cache, done) { - var name = job.project.name - , dirs = { - base: base, - data: path.join(base, "data", name.replace('/','-') + "-" + job._id.toString()), - cache: cache - } +function initJobDirs(branchBase, job, cache, done) { + var dirs = { + base: branchBase, + data: path.join(branchBase, 'job-' + job._id.toString()), + cache: cache + } async.series([ function checkData(next) { @@ -292,13 +292,18 @@ Runner.prototype = { oldnext() } this.callbackMap[job._id] = next - // Keep around N most recent build directories. - // Default is 0, ie wipe at start of each run. + + var projectName = job.project.name.replace('/', '-') + , branchName = branchFromJob(job).replace('/', '-') + , branchBase = path.join(self.config.dataDir, 'data', projectName + '-' + branchName) + + // Keep around N most recent build directories for this branch. + // The default is 0, ie wipe at start of each run. // Later, this can be configurable in the UI. - keeper({baseDir: path.join(this.config.dataDir, "data"), count: this.config.concurrentJobs - 1}, function(err) { + keeper({baseDir: branchBase, count: 0}, function(err) { if (err) throw err; - initJobDirs(self.config.dataDir, job, cache.base, jobDirsReady) + initJobDirs(branchBase, job, cache.base, jobDirsReady) }) var jobDirsReady = function(err, dirs) { diff --git a/lib/jobqueue.js b/lib/jobqueue.js index 19539a8..f420508 100644 --- a/lib/jobqueue.js +++ b/lib/jobqueue.js @@ -1,4 +1,4 @@ -var stringify = require('json-stable-stringify') +var branchFromJob = require('./utils').branchFromJob module.exports = JobQueue @@ -27,17 +27,8 @@ JobQueue.prototype = { task.id = task.job._id - // Derive a job key from the tuple of (project, ref). - // ref technically has an arbitrary internal structure. If it isn't the usual { branch: "foo" }, - // we'll use a stable JSON-serialized form instead. - var suffix = '' - if ('branch' in task.job.ref) { - suffix = task.job.ref.branch - } else { - suffix = stringify(task.ref) - } - - task.key = task.job.project + suffix + // Tasks with identical keys will be prevented from being scheduled concurrently. + task.key = task.job.project + branchFromJob(task.job) this.tasks.push(task) diff --git a/lib/utils.js b/lib/utils.js index 3b38285..f9d2408 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,10 +1,11 @@ var _ = require('lodash') - , consts = require('./consts') + , stringify = require('json-stable-stringify') module.exports = { - ensureCommand: ensureCommand + ensureCommand: ensureCommand, + branchFromJob: branchFromJob } function ensureCommand(phase) { @@ -16,3 +17,18 @@ function ensureCommand(phase) { return command } +// Extract a branch name, suitable for use as a filesystem path, from the contents of the job's +// ref field. Prefer common ref structures when available (branch, fetch) but fall back to something +// that's ugly but unique and stable for arbitrary ref payloads. +function branchFromJob(job) { + var ref = job.ref + if ('branch' in ref) { + return ref.branch + } else if ('fetch' in ref) { + return ref.fetch + } else { + // This is going to be incredibly ugly, but it will be (a) consistent for consistent refs and + // (b) include only filesystem-safe characters. + return encodeURIComponent(stringify(ref)) + } +} From cc26e79fa3037066caa145ffc1f1a0af2f86b82f Mon Sep 17 00:00:00 2001 From: Ash Wilson Date: Wed, 23 Mar 2016 09:37:52 -0400 Subject: [PATCH 12/12] Guard against missing refs. This should only happen in unit tests. --- lib/utils.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/utils.js b/lib/utils.js index f9d2408..756dd00 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -22,6 +22,11 @@ function ensureCommand(phase) { // that's ugly but unique and stable for arbitrary ref payloads. function branchFromJob(job) { var ref = job.ref + + if (typeof ref === 'undefined') { + return '' + } + if ('branch' in ref) { return ref.branch } else if ('fetch' in ref) {