diff --git a/Readme.md b/Readme.md index 8c966384..f97f2ac4 100644 --- a/Readme.md +++ b/Readme.md @@ -164,6 +164,16 @@ Job producers can set an expiry value for the time their job can live in active queue.create('email', {title: 'email job with TTL'}).ttl(milliseconds).save(); ``` + +### Job inactive TTL + +Job producers can set an expiry value for the time their job can live in inactive state, to prevent overload or unavailable workers + +```js +queue.create('email', {title: 'email job with TTL'}).ttlInactive(milliseconds).save(); +``` + + ### Job Logs Job-specific logs enable you to expose information to the UI at any point in the job's life-time. To do so simply invoke `job.log()`, which accepts a message string as well as variable-arguments for sprintf-like support: diff --git a/lib/kue.js b/lib/kue.js index 533bf20c..daea63dd 100755 --- a/lib/kue.js +++ b/lib/kue.js @@ -154,6 +154,7 @@ Queue.prototype.setupTimers = function() { } this.checkJobPromotion(this._options.promotion); this.checkActiveJobTtl(this._options.promotion); + this.checkInactiveJobTtl(this._options.promotion); }; /** @@ -259,6 +260,74 @@ Queue.prototype.checkActiveJobTtl = function( ttlOptions ) { }, timeout); }; + +Queue.prototype.checkInactiveJobTtl = function( ttlOptions ) { + ttlOptions = ttlOptions || {}; + var client = this.client + , self = this + , timeout = ttlOptions.interval || 1000 + , lockTtl = 2000 + , limit = ttlOptions.limit || 1000; + clearInterval(this.inactiveJobsTtlTimer); + this.inactiveJobsTtlTimer = setInterval(function() { + if(self.shuttingDown || !self.lockClient) return; + self.warlock.lock('inactiveJobsTTL', lockTtl, function( err, unlock ) { + if( err ) { + // Something went wrong and we weren't able to set a lock + self.emit('error', err); + return; + } + if( typeof unlock === 'function' ) { + // If the lock is set successfully by this process, an unlock function is passed to our callback. + // filter only jobs set with a ttl (timestamped) between a large number and current time + client.zrangebyscore(client.getKey('jobs:inactive'), 100000, Date.now(), 'LIMIT', 0, limit, function( err, ids ) { + if( err || !ids.length ) return unlock(); + + var idsRemaining = ids.slice(); + var doUnlock = _.after(ids.length, function(){ + self.removeAllListeners( 'job ttl exceeded ack' ); + waitForAcks && clearTimeout( waitForAcks ); + unlock && unlock(); + }); + + self.on( 'job ttl exceeded ack', function( id ) { + idsRemaining.splice( idsRemaining.indexOf( id ), 1 ); + doUnlock(); + }); + + var waitForAcks = setTimeout( function(){ + idsRemaining.forEach( function( id ){ + id = client.stripFIFO(id); + Job.get(id, function( err, job ) { + if( err ) return doUnlock(); + job.failedAttempt( { error: true, message: 'TTL exceeded' }, function( err, hasAttempts, attempt){ + if( err ) { + self.emit('error', err, job); + }else if( hasAttempts ) { + events.emit(job.id, 'failed attempt', 'TTL exceeded', attempt); + } else { + events.emit(job.id, 'failed', 'TTL exceeded', attempt); + } + doUnlock(); + }); + }); + }); + }, 1000 ); + + ids.forEach(function( id ) { + id = client.stripFIFO(id); + events.emit(id, 'ttl exceeded'); + }); + }); + } else { + // The lock was not established by us, be silent + } + }); + }, timeout); +}; + + + /** * Runs a LUA script to diff inactive jobs ZSET cardinality * and helper pop LIST length each `ms` milliseconds and syncs helper LIST. diff --git a/lib/queue/job.js b/lib/queue/job.js index 204aff27..32c23887 100644 --- a/lib/queue/job.js +++ b/lib/queue/job.js @@ -185,6 +185,7 @@ exports.get = function( id, jobType, fn ) { // we can just merge these job.type = hash.type; job._ttl = hash.ttl; + job._ttlInactive = hash.ttlInactive; job._delay = hash.delay; job.priority(Number(hash.priority)); job._progress = hash.progress; @@ -331,6 +332,7 @@ Job.prototype.toJSON = function() { , delay: this._delay , workerId: this.workerId , ttl: this._ttl + , ttlInactive : this._ttlInactive , attempts: { made: Number(this._attempts) || 0 , remaining: this._attempts > 0 ? this._max_attempts - this._attempts : Number(this._max_attempts) || 1 @@ -341,11 +343,11 @@ Job.prototype.toJSON = function() { Job.prototype.refreshTtl = function() { - ('active' === this.state() && this._ttl > 0) - ? - this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid, noop) - : - noop(); + if ('active' === this.state() && this._ttl > 0) { + this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid, noop) + }else if ('inactive' === this.state() && this._ttlInactive > 0) { + this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttlInactive), this.zid, noop) + } }; @@ -493,6 +495,14 @@ Job.prototype.ttl = function( param ) { return this; }; +Job.prototype.ttlInactive = function( param ) { + if(0 === arguments.length ) return this._ttlInactive; + if( param > 0 ) { + this._ttlInactive = param; + } + return this; +}; + Job.prototype._getBackoffImpl = function() { var self = this var supported_backoffs = { @@ -696,6 +706,7 @@ Job.prototype.state = function( state, fn ) { ('active' === state && this._ttl > 0) ? multi.zadd(client.getKey('jobs:' + state), Date.now() + parseInt(this._ttl), this.zid) : noop(); ('active' === state && !this._ttl) ? multi.zadd(client.getKey('jobs:' + state), this._priority<0?this._priority:-this._priority, this.zid) : noop(); ('inactive' === state) ? multi.lpush(client.getKey(this.type + ':jobs'), 1) : noop(); + ('inactive' === state && this._ttlInactive > 0) ? multi.zadd(client.getKey('jobs:' + state), Date.now() + parseInt(this._ttlInactive), this.zid) : noop(); this.set('updated_at', Date.now()); this._state = state; @@ -848,6 +859,9 @@ Job.prototype.update = function( fn ) { if( this._ttl ) { this.set('ttl', this._ttl); } + if (this._ttlInactive ){ + this.set('ttlInactive', this._ttlInactive); + } if( this._removeOnComplete ) this.set('removeOnComplete', this._removeOnComplete); if( this._backoff ) { if( _.isPlainObject(this._backoff) ) this.set('backoff', JSON.stringify(this._backoff));