Premium Queue package for handling distributed jobs and messages in NodeJS.

Overview



The fastest, most reliable, Redis-based queue for Node.
Carefully written for rock solid stability and atomicity.


Sponsors · Features · UIs · Install · Quick Guide · Documentation

Check the new Guide!


Follow @manast for Bull news and updates!


BullMQ

If you want to start using the next major version of Bull written entirely in Typescript you are welcome to the new repo here. Otherwise you are very welcome to still use Bull, which is a safe, battle tested codebase.


Official FrontEnd

Taskforce.sh, Inc

Super charge your queues with a professional front end and optional Redis hosting:

  • Get a complete overview of all your queues.
  • Inspect jobs, search, retry, or promote delayed jobs.
  • Metrics and statistics.
  • and many more features.

Sign up at Taskforce.sh


Bull Features

  • Minimal CPU usage due to a polling-free design.
  • Robust design based on Redis.
  • Delayed jobs.
  • Schedule and repeat jobs according to a cron specification.
  • Rate limiter for jobs.
  • Retries.
  • Priority.
  • Concurrency.
  • Pause/resume—globally or locally.
  • Multiple job types per queue.
  • Threaded (sandboxed) processing functions.
  • Automatic recovery from process crashes.

And coming up on the roadmap...

  • Job completion acknowledgement.
  • Parent-child jobs relationships.

UIs

There are a few third-party UIs that you can use for monitoring:

BullMQ

Bull v3

Bull <= v2


Monitoring & Alerting


Feature Comparison

Since there are a few job queue solutions, here is a table comparing them:

Feature Bull Kue Bee Agenda
Backend redis redis redis mongo
Priorities
Concurrency
Delayed jobs
Global events
Rate Limiter
Pause/Resume
Sandboxed worker
Repeatable jobs
Atomic ops
Persistence
UI
Optimized for Jobs / Messages Jobs Messages Jobs

Install

npm install bull --save

or

yarn add bull

Requirements: Bull requires a Redis version greater than or equal to 2.8.18.

Typescript Definitions

npm install @types/bull --save-dev
yarn add --dev @types/bull

Definitions are currently maintained in the DefinitelyTyped repo.

Contributing

We welcome all types of contributions, either code fixes, new features or doc improvements. Code formatting is enforced by prettier. For commits please follow conventional commits convention. All code must pass lint rules and test suites before it can be merged into develop.


Quick Guide

Basic Usage

var Queue = require('bull');

var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1', password: 'foobared'}}); // Specify Redis connection using object
var imageQueue = new Queue('image transcoding');
var pdfQueue = new Queue('pdf transcoding');

videoQueue.process(function(job, done){

  // job.data contains the custom data passed when the job was created
  // job.id contains id of this job.

  // transcode video asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error('error transcoding'));

  // or pass it a result
  done(null, { framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
});

audioQueue.process(function(job, done){
  // transcode audio asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error('error transcoding'));

  // or pass it a result
  done(null, { samplerate: 48000 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
});

imageQueue.process(function(job, done){
  // transcode image asynchronously and report progress
  job.progress(42);

  // call done when finished
  done();

  // or give a error if error
  done(new Error('error transcoding'));

  // or pass it a result
  done(null, { width: 1280, height: 720 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
});

pdfQueue.process(function(job){
  // Processors can also return promises instead of using the done callback
  return pdfAsyncProcessor();
});

videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});

Using promises

Alternatively, you can use return promises instead of using the done callback:

videoQueue.process(function(job){ // don't forget to remove the done callback!
  // Simply return a promise
  return fetchVideo(job.data.url).then(transcodeVideo);

  // Handles promise rejection
  return Promise.reject(new Error('error transcoding'));

  // Passes the value the promise is resolved with to the "completed" event
  return Promise.resolve({ framerate: 29.5 /* etc... */ });

  // If the job throws an unhandled exception it is also handled correctly
  throw new Error('some unexpected error');
  // same as
  return Promise.reject(new Error('some unexpected error'));
});

Separate processes

The process function can also be run in a separate process. This has several advantages:

  • The process is sandboxed so if it crashes it does not affect the worker.
  • You can run blocking code without affecting the queue (jobs will not stall).
  • Much better utilization of multi-core CPUs.
  • Less connections to redis.

In order to use this feature just create a separate file with the processor:

// processor.js
module.exports = function(job){
  // Do some heavy work

  return Promise.resolve(result);
}

And define the processor like this:

// Single process:
queue.process('/path/to/my/processor.js');

// You can use concurrency as well:
queue.process(5, '/path/to/my/processor.js');

// and named processors:
queue.process('my processor', 5, '/path/to/my/processor.js');

Repeated jobs

A job can be added to a queue and processed repeatedly according to a cron specification:

  paymentsQueue.process(function(job){
    // Check payments
  });

  // Repeat payment job once every day at 3:15 (am)
  paymentsQueue.add(paymentsData, {repeat: {cron: '15 3 * * *'}});

As a tip, check your expressions here to verify they are correct: cron expression generator

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

queue.pause().then(function(){
  // queue is paused now
});

queue.resume().then(function(){
  // queue is resumed now
})

Events

A queue emits some useful events, for example...

.on('completed', function(job, result){
  // Job completed with output result!
})

For more information on events, including the full list of events that are fired, check out the Events reference

Queues performance

Queues are cheap, so if you need many of them just create new ones with different names:

var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.
.
.

However every queue instance will require new redis connections, check how to reuse connections or you can also use named processors to achieve a similar result.

Cluster support

NOTE: From version 3.2.0 and above it is recommended to use threaded processors instead.

Queues are robust and can be run in parallel in several threads or processes without any risk of hazards or queue corruption. Check this simple example using cluster to parallelize jobs across processes:

var
  Queue = require('bull'),
  cluster = require('cluster');

var numWorkers = 8;
var queue = new Queue("test concurrent queue");

if(cluster.isMaster){
  for (var i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on('online', function(worker) {
    // Lets create a few jobs for the queue workers
    for(var i=0; i<500; i++){
      queue.add({foo: 'bar'});
    };
  });

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
}else{
  queue.process(function(job, jobDone){
    console.log("Job done by worker", cluster.worker.id, job.id);
    jobDone();
  });
}

Documentation

For the full documentation, check out the reference and common patterns:

  • Guide — Your starting point for developing with Bull.
  • Reference — Reference document with all objects and methods available.
  • Patterns — a set of examples for common patterns.
  • License — the Bull license—it's MIT.

If you see anything that could use more docs, please submit a pull request!


Important Notes

The queue aims for an "at least once" working strategy. This means that in some situations, a job could be processed more than once. This mostly happens when a worker fails to keep a lock for a given job during the total duration of the processing.

When a worker is processing a job it will keep the job "locked" so other workers can't process it.

It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled - and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration on interval lockRenewTime (which is usually half lockDuration). If lockDuration elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:

  1. The Node process running your job processor unexpectedly terminates.
  2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the lockDuration setting (with the tradeoff being that it will take longer to recognize a real stalled job).

As such, you should always listen for the stalled event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.

As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor always crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount times (default: 1).

Comments
  • Jobs get stuck after Redis reconnect

    Jobs get stuck after Redis reconnect

    Description

    When the Redis gets disconnected and connected again, the queue doesn't pick up any jobs. (Easy To Reproduce)

    Minimal, Working Test code to reproduce the issue.

    const Queue = require('bull')
    const Redis = require('ioredis')
    
    const client = new Redis({
      host: 'localhost',
      port: 6379
    })
    
    const queue = new Queue('test', client)
    
    client.on('ready', () => {
      console.log('client ready')
    })
    
    client.on('connect', () => {
      console.log('client connect')
    })
    
    client.on('close', () => {
      console.log('client close')
    })
    
    client.on('end', error => {
      console.log('client end', error)
    })
    
    client.on('reconnecting', error => {
      console.log('client reconnecting', error)
    })
    
    client.on('error', error => {
      console.log('client error', error.message)
    })
    
    queue
      .on('completed', (job, results) => {
        console.log('queue completed a job')
      })
      .on('error', async (error) => {
        console.error('queue error', error.message)
      })
      .on('failed', async (job, error) => {
        console.error('queue error', error.message)
      })
    
    queue.process(async (job) => {
      console.log('running task')
    })
    
    setInterval(function () {
      queue.add({}).then(() => console.log('pushed task')).catch((e) => console.log('failed to push task'))
    }, 10000)
    

    Bull version

    Bull 3.18.0 Node 12 and 14 Redis 3.2.8

    Steps to reproduce.

    • Run the above script
    • Bull JS start accepting and processing Jobs
    • Stop Redis
    • Bull JS will keep retrying
    • Start Redis
    • Bull JS will connect in a few milliseconds
    • At this stage, the queue can push the jobs and but it won't process any jobs. :-(

    Things we tried

    • Tried explicitly the reconnect option of ioredis. https://github.com/luin/ioredis#auto-reconnect and https://github.com/luin/ioredis#reconnect-on-error But issue don't seem to be coming from ioredis.
    enhancement cannot reproduce released 
    opened by royalpinto 63
  • Redis network bandwidth consumption

    Redis network bandwidth consumption

    Description

    I have a cluster with 30 workers consuming jobs from redis using bull (around 0.5 tasks per second, and each task lasts for 1 minute). Each worker restarts every hour and is on one of 3 different nodes (which might be different from the one redis is hosted).

    I want to debug or understand why the network consumption is around 25mb/s. Is this normal behavior from bull and how do I debug this problem?

    Each job payload is a simple string.

    Bull version

    "bull": "^3.5.2"

    Additional information

    Each queue has around 10k of tasks waiting for completion and/or delayed. We can see the traffic is between redis and the workers although only a string is fetched

    opened by abriosi 59
  • How to remove repeatable jobs?

    How to remove repeatable jobs?

    Description

    Calling queue.empty() will not remove the repeatable job. I made a repeatable job at cron 1 minute interval, then tried upping it to 5 minutes. The result was that I had two cron jobs: one at the 1 minute interval, the other at the 5 minute interval. I would like to know if there is an API call to remove a repeat/cron so that I can essentially update/change the cron for a job vs having multiple crons for a job.

    Minimal, Working Test code to reproduce the issue.

    paymentsQueue.process(function(job){
      // Check payments
    });
    
    // Repeat payment job once every day at 3:15 (am)
    paymentsQueue.add(paymentsData, {repeat: {cron: '*/1 * * * *'}});
    // does not clear the one minute
    paymentsQueue.empty()
    paymentsQueue.add(paymentsData, {repeat: {cron: '*/5 * * * *'}});
    

    Now I have two repeatable jobs, but I only want one.

    Bull version

    3.4.3

    Additional information

    None.

    question wontfix 
    opened by chrisabrams 41
  • Heap out of memory error on large number of queued DELAYED jobs

    Heap out of memory error on large number of queued DELAYED jobs

    Hi,

    I currently have around 2 million tasks queued in Bull which all need to be processed. I'm trying to test the performance of a single worker against this queue of tasks, but shortly after I start the worker I get the following heap memory error:

    FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
    
    <--- Last few GCs --->
    
    [13:0x2d0a5c0]    82667 ms: Mark-sweep 947.4 (1508.8) -> 947.3 (1510.8) MB, 877.4 / 0.0 ms  allocation failure GC in old space requested
    [13:0x2d0a5c0]    83565 ms: Mark-sweep 947.3 (1510.8) -> 947.2 (1456.8) MB, 897.2 / 0.0 ms  last resort GC in old space requested
    [13:0x2d0a5c0]    84474 ms: Mark-sweep 947.2 (1456.8) -> 947.2 (1436.3) MB, 909.3 / 0.0 ms  last resort GC in old space requested
    
    
    <--- JS stacktrace --->
    
    ==== JS stack trace =========================================
    
    Security context: 0x142c70c25879 <JSObject>
        1: _settlePromise [/src/node_modules/bluebird/js/release/promise.js:~542] [pc=0xac62245a421](this=0x38989dfe5681 <Promise map = 0x3f8459124389>,promise=0x38989dfed891 <Promise map = 0x3f8459124389>,handler=0x1756049822d1 <undefined>,receiver=0x1756049822d1 <undefined>,value=0x173509168221 <Number 1.5411e+12>)
        2: _drainQueue(aka _drainQueue) [/src/node_modules/bluebird/js/release/async.js...
    
     1: node::Abort() [node /src/dist/app.js]
     2: 0x8cbf4c [node /src/dist/app.js]
     3: v8::Utils::ReportOOMFailure(char const*, bool) [node /src/dist/app.js]
     4: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [node /src/dist/app.js]
     5: v8::internal::Factory::NewUninitializedFixedArray(int) [node /src/dist/app.js]
     6: 0xd801bc [node /src/dist/app.js]
     7: 0xd97a95 [node /src/dist/app.js]
     8: v8::internal::JSObject::AddDataElement(v8::internal::Handle<v8::internal::JSObject>, unsigned int, v8::internal::Handle<v8::internal::Object>, v8::internal::PropertyAttributes, v8::internal::Object::ShouldThrow) [node /src/dist/app.js]
     9: v8::internal::Object::AddDataProperty(v8::internal::LookupIterator*, v8::internal::Handle<v8::internal::Object>, v8::internal::PropertyAttributes, v8::internal::Object::ShouldThrow, v8::internal::Object::StoreFromKeyed) [node /src/dist/app.js]
    10: v8::internal::Object::SetProperty(v8::internal::LookupIterator*, v8::internal::Handle<v8::internal::Object>, v8::internal::LanguageMode, v8::internal::Object::StoreFromKeyed) [node /src/dist/app.js]
    11: v8::internal::Runtime_SetProperty(int, v8::internal::Object**, v8::internal::Isolate*) [node /src/dist/app.js]
    12: 0xac6223042fd
    

    To try and narrow down the problem I've simplified my code down to the point where the process function isn't actually doing anything, it's just returning immediately. This leads me to believe there's an issue in the internals of the library when this many tasks are queued at once. I have also tried artificially limiting the speed of the processor using a timeout promise that resolves after 100ms, but I still hit this issue after 2-3 jobs are processed.

    PRIO 1 cannot reproduce 
    opened by ajwootto 40
  • Missing lock for job

    Missing lock for job

    Ciao!

    It works awesome! How are you? But the problem is, that we have 4 deployments and every deployments has 8 processes which is 32 processes and if it always creates a new connect to Redis, it runs out of connections. I tried doing this like this (re-use Redis connections):

    var opts = {
      prefix: `${global.ngivr.config.redis.scope}queue`,
    //  redis: global.ngivr.config.redisUrl,
    //  /*
      createClient: function (type) {
        switch (type) {
          case 'client':
            return global.ngivr.redis;
          case 'subscriber':
            return global.ngivr.redisSubscriber;
          default:
            return new Redis(global.ngivr.config.redis);
        }
      }
    //  */
    }
    
     kues[named] = Queue(named, opts);
    

    But I always get this error:

    failedReason: 'Missing lock for job 1 finished' } Error: job stalled more than allowable limit
    

    I am using ioredis.

    Do you know what it could be the problem?

    Thanks, Patrik

    bug 
    opened by p3x-robot 38
  • Empty and clean jobs

    Empty and clean jobs

    Hi,

    I'm using bull in my Node.js application. It works good, but sometimes appears issues with repeated jobs especially on application restarts. In my case one or more repeated job don't runs. I've found that I can fix this issue by clearing Redis (flushdb) and restarting application. Then everything works good.

    So I've decided to clean all queues on application start. I've found emply and clean methods in documentation. However, it is not clear for me how do they work with Redis. Should they clear Redis database after execution?

    I've tried emply / clean methods and they don't clean my Redis db. So I have duplicates:

    127.0.0.1:6379[1]> keys *
     1) "bull:vip:repeat"
     2) "bull:notify:repeat:vk-scheduler:notify-scheduler:1506930900000"
     3) "bull:notify:repeat:vk-scheduler:notify-scheduler:1506930600000"
     4) "bull:vip:repeat:scheduler:vip-scheduler:1506942000000"
     5) "bull:vip:repeat:scheduler:vip-scheduler:1506930828000"
     6) "bull:vip:id"
     7) "bull:notify:id"
     8) "bull:notify:repeat"
     9) "bull:notify:repeat:cleaner:notify-cleaner:1506996000000"
    10) "bull:notify:repeat:vk-scheduler:notify-scheduler:1506930300000"
    11) "bull:vip:repeat:scheduler:vip-scheduler:1506930880000"
    

    In this case vip-scheduler should run two times per day (0 0 2,12 * * *), but there are duplicates in Redis. So I'm not sure how many times it will be executed. Does it matter what stores in Redis database?

    queue.add('scheduler', {}, { jobId: 'vip-scheduler', repeat: { cron: '0 0 2,12 * * *' }, removeOnComplete: true, removeOnFail: true });
    queue.process('scheduler', vipWorker.scheduler);
    

    Thanks

    bug PRIO 1 
    opened by nanom1t 34
  • redis script cache gets bloated after update to bull 2.0

    redis script cache gets bloated after update to bull 2.0

    Hello,

    I'm experiencing a growing Lua script cache after updating to bull-2.0 After 20 days of uptime my redis Lua cache grown up to 15GB

    Anyone experiencing same issue?

    Thanks

    bug 
    opened by prdn 32
  • Stuck & Ophaned jobs

    Stuck & Ophaned jobs

    Hi there.

    It seems 3.0 is very stable. Thank you.

    I process tens of thousands of jobs every day. I always have removeOnComplete set to true on them, so ideally, if the queue is empty, redis should be empty.

    Today I looked and there are thousands (~30K) jobs that are just there. Stuck.

    Some of them has had 0 attempts made.

    This is an example:

    spectacle bw8263

    First I thought maybe I'm not doing a graceful shutdown or something like that. But this job ~12 hours old and I haven't had a restart in the queue worker for the past 48 hours or something.

    Any ideas how I can get more clues to find the problem?

    opened by emilsedgh 31
  • Reconnection problem

    Reconnection problem

    Description

    I create a queue and add tasks to queue every 10 seconds. Use bluebird library to set a delay in a task processor.

    The point is - I'm trying to check behavior when redis connection is lost. When I switch off the redis server in prcessors work phase, everything is ok, we finish the task, reconnect and continue task processing.

    If I try to switch off the redis server, when there is no tasks in queue, task processing is stop and it doesn't reconnect to the server. The connection is in "end" status.

    I need to get any way to establish a connection after redis server was restarted. It's vital for our project.

    Logs when I restart redis server on waiting period of the time (non-task processing time):

    ➜  queue git:(master) ✗ node test.js
    start task
    resolve task!
    start task
    resolve task!
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: ReplyError: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
    

    Logs when I restart redis server while a task is executing (processing time):

    ➜  queue git:(master) ✗ node test.js
    start task
    Error in bull queue happend: Error: read ECONNRESET
    Error in bull queue happend: Error: read ECONNRESET
    Error in bull queue happend: Error: read ECONNRESET
    Error in bull queue happend: Error: read ECONNRESET
    Error in bull queue happend: Error: read ECONNRESET
    Error in bull queue happend: Error: read ECONNRESET
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: Error: connect ECONNREFUSED 127.0.0.1:6379
    Error in bull queue happend: ReplyError: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
    resolve task!
    Error in bull queue happend: Error: Missing lock for job 1 failed
    start task
    resolve task!
    start task
    resolve task!
    

    Look at the end of the copy-paste - new tasks are processing. Everything looks fine unlike the previous example.

    Test code to reproduce

    const Promise = require('bluebird');
    const Queue = require('bull');
    
    const queue = new Queue('test', {
      redis: {
        host: 'localhost',
        port: 6379
      }
    });
    
    queue
      .on('error', function (error) {
        console.error(`Error in bull queue happend: ${error}`);
      })
      .on('failed', function (job, error) {
        console.error(`Task was failed with reason: ${error}`);
      });
    
    queue.process('test_task', () => {
      console.log('start task');
      return Promise.delay(5000).then(() => {
        console.log('resolve task!');
      });
    });
    
    setInterval(function () {
      queue.add('test_task', {});
    }, 10000);
    

    Bull version

    bull - 3.3.10, node - v6.10.2

    bug 
    opened by ks-s-a 30
  • Question: may a jobId related key exist and jobId:lock key be absent?

    Question: may a jobId related key exist and jobId:lock key be absent?

    Hi.

    The question: if some key bull:xxx exists in Redis, does this mean that bull:xxx:lock key must also always exist there? (Assuming removeOnComplete: true, removeOnFail: true usage.) Or not necessarily?

    Context: we're using queue.add({..., jobId: xxx, removeOnComplete: true, removeOnFail: true }) feature to prevent jobs double-scheduling. And sometimes a job gets stuck completely (although finished long time ago), because its jobId related redis key is in the database, but there is no corresponding jobId:lock key there. Such jobs are also invisible in Arena; the only way to see them is to do redis-cli KEYS "bull:*" | grep ... and search for keys which have no :lock coupled.

    Also, if I run hgetall for such "orphaned" keys, the output looks like:

    1) "failedReason"
    2) "Missing key for job abb18569499f62ab finished"
    3) "stacktrace"
    4) "[\"Error: Missing key for job abb18569499f62ab finished\\n    at Object.finishedErrors..."
    5) "attemptsMade"
    6) "1"
    

    I've seen similar symptoms mentioned here in Issues many times, but I couldn't find a resolution. So I just think about brute-force scanning through all bull keys every minute, finding the ones which look "suspicious" (i.e. have no :lock counterparts and also have failedReason), then removing them entirely.

    question wontfix 
    opened by dko-slapdash 29
  • Delayed jobs

    Delayed jobs

    It's occasionally a requirement for a job to not be done for at least N seconds.

    There are two obvious API methods for doing this:

    1. Addition of "delay" to the job options accepted by Queue.add / Job.create
    2. Addition of a "delay" method to the Job class

    (For Kue, the second option makes sense because Job objects can be used from within their own processor, so failed jobs can have a delay before retrying. With Bull it doesn't make much sense, i think?)

    How to implement this? Essentially, delayed jobs are put into a ZSET with the score being the time that they should be processed after. Then, the system polls ZRANGEBYSCORE (once Queue.process is called, or once Queue.processDelayed() or something like that is called) from 0 to now, and puts those jobs onto the active queue as before.

    If you want me to hammer out a PR once we have discussed the API choices a bit then I'd be happy to - something with a bit more care than Kue would be great!

    Of course, it might also be a nice distinct library / extension for bull that can be added distinctly.

    opened by richthegeek 29
  • chore(deps): bump json5 from 2.2.0 to 2.2.3

    chore(deps): bump json5 from 2.2.0 to 2.2.3

    Bumps json5 from 2.2.0 to 2.2.3.

    Release notes

    Sourced from json5's releases.

    v2.2.3

    v2.2.2

    • Fix: Properties with the name __proto__ are added to objects and arrays. (#199) This also fixes a prototype pollution vulnerability reported by Jonathan Gregson! (#295).

    v2.2.1

    • Fix: Removed dependence on minimist to patch CVE-2021-44906. (#266)
    Changelog

    Sourced from json5's changelog.

    v2.2.3 [code, diff]

    v2.2.2 [code, diff]

    • Fix: Properties with the name __proto__ are added to objects and arrays. (#199) This also fixes a prototype pollution vulnerability reported by Jonathan Gregson! (#295).

    v2.2.1 [code, diff]

    • Fix: Removed dependence on minimist to patch CVE-2021-44906. (#266)
    Commits
    • c3a7524 2.2.3
    • 94fd06d docs: update CHANGELOG for v2.2.3
    • 3b8cebf docs(security): use GitHub security advisories
    • f0fd9e1 docs: publish a security policy
    • 6a91a05 docs(template): bug -> bug report
    • 14f8cb1 2.2.2
    • 10cc7ca docs: update CHANGELOG for v2.2.2
    • 7774c10 fix: add proto to objects and arrays
    • edde30a Readme: slight tweak to intro
    • 97286f8 Improve example in readme
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    dependencies 
    opened by dependabot[bot] 0
  • Retrying jobs with a single queue and multiple consumers

    Retrying jobs with a single queue and multiple consumers

    I've been grooming issues (both open and closed) and StackOverflow to find an answer to my question, but without any luck. Therefore, I apologize upfront if this has in fact been answered previously. In that case, the link to the solution is highly appreciated.

    Description

    I want to configure retries on jobs, but I can't figure out the best approach to tackle this when there are multiple consumers of a single job. The job options { attempts: 5 } does indeed work, but my issue is that when it retries, all consumers are processing the job once more instead of only the single consumer that threw an error.

    It might be the intended behavior; in that case, I would appreciate alternative ways of configuring my queue, processor, and consumer setup.

    Minimal, Working Test code to reproduce the issue.

    As this is part of a larger application, I can't provide you with reproducible code. Instead, I'll outline the high-level flow of events processing (in code that can't be compiled):

    // Initializing the queue
    const queue = new Bull(`bull-events-queue`, { ... })
    
    // Emitting a job
    queue.add({ jobName, data }, { attempts: 5, removeOnComplete: true })
    
    // Consuming a job
    // It is important to stress that we typically have multiple consumer per job
    consume(jobName, consumer) {
        const existingConsumers = consumers_.get(event) ?? [] // these are saved on the class itself
        this.consumers_.set(jobName, [...existingConsumers, consumer])
    }
    
    // Processing jobs
    worker_ = (job) => {
        const { jobName, data } = job.data
        const consumers = this.consumers_.get(jobName) || []
    
        return await Promise.all(
          consumers.map(async (consumer) => {
            return consumer(data, eventName).catch((err) => {
              throw err // this triggers the retry mechanism of bull
            })
          })
        )
      }
    
    // Example scenario:
    consume("test.event", () => console.log("hello"));
    consume("test.event", () => throw Error("Some error")); // this consumer happens to fail for some reason
    consume("test.event", () => console.log("hello 3"));
    
    queue.add({ "test.event", { "key": "value" } }, { attempts: 5, removeOnComplete: true })
    

    In this case, all three consumers would be run 5 times due to the attempts option, although it's only a single consumer actually failing. It should be obvious that this can lead to data inconsistencies as jobs are processed more than once even though it completed.

    Does Bull natively support some kind of idempotency mechanism for this use case, or would I have to build that myself?

    Looking forward to hearing from you.

    Bull version

    3.29.3

    Additional information

    opened by olivermrbl 2
  • Priority not respected when using limiter - How does priority work?

    Priority not respected when using limiter - How does priority work?

    Description

    I have the feeling that priorities either do not work as expected or my expectation on how it should be working is wrong. My expectation: Assuming a queue with 10 waiting jobs, all with priority 5. Limiter set to 1 job in 3 seconds. If I add a new job, with a priority < 5, it should be the very next job that's being processed.

    That does not seem to be happening when adding jobs to the queue from within a job.

    Minimal, Working Test code to reproduce the issue.

    const Queue = require('bull');
    const prioQueue = new Queue('prioQueue', {
        limiter: {
            max: 1,
            duration: 3000
        },
        defaultJobOptions: {
            priority: 4,
            removeOnComplete: true
        }
    });
    
    prioQueue.process((job) => {
        let data = job.data;
        console.log('processing:', data, 'with prio', job.opts.priority);
        if (job.opts.priority > 1) {
            data.v = data.v + 1;
            console.log('creating new job with prio', job.opts.priority - 1);
            prioQueue.add(data, {
                priority: job.opts.priority - 1
            });
        }
        return Promise.resolve();
    });
    
    prioQueue.add({
        foo: 'bar',
        v: 1
    });
    
    prioQueue.add({
        x: 'y',
        v: 1
    });
    
    prioQueue.add({
        name: 'alice',
        v: 1
    });
    

    Result:

    processing: { foo: 'bar', v: 1 } with prio 4
    creating new job with prio 3
    processing: { foo: 'bar', v: 2 } with prio 3
    creating new job with prio 2
    processing: { x: 'y', v: 1 } with prio 4
    creating new job with prio 3
    processing: { name: 'alice', v: 1 } with prio 4
    creating new job with prio 3
    processing: { foo: 'bar', v: 3 } with prio 2
    creating new job with prio 1
    processing: { x: 'y', v: 2 } with prio 3
    creating new job with prio 2
    processing: { name: 'alice', v: 2 } with prio 3
    creating new job with prio 2
    processing: { foo: 'bar', v: 4 } with prio 1
    processing: { x: 'y', v: 3 } with prio 2
    creating new job with prio 1
    processing: { name: 'alice', v: 3 } with prio 2
    creating new job with prio 1
    processing: { x: 'y', v: 4 } with prio 1
    processing: { name: 'alice', v: 4 } with prio 1
    

    We can see, that the prio 2 job (foo:bar) created in line 4 get's executed AFTER 2 prio 4 jobs (x:y & name:alice).

    I'd expect to see something like this, especially since we're only doing one job every 3 seconds:

    processing: { foo: 'bar', v: 1 } with prio 4
    creating new job with prio 3
    processing: { foo: 'bar', v: 2 } with prio 3
    creating new job with prio 2
    processing: { foo: 'bar', v: 3 } with prio 2
    creating new job with prio 1
    processing: { foo: 'bar', v: 4 } with prio 1
    processing: { x: 'y', v: 1 } with prio 4
    ...
    

    Bull version

    4.10.2

    Additional information

    Is my understanding of priorities not correct?

    opened by T0BiD 1
  • ReplyError: ERR Invalid command specified

    ReplyError: ERR Invalid command specified

    /home/nicholas/projects/shopspy-server/node_modules/redis-parser/lib/parser.js:179 return new ReplyError(string) ^ ReplyError: ERR Invalid command specified at parseError (/home/nicholas/projects/shopspy-server/node_modules/redis-parser/lib/parser.js:179:12) at parseType (/home/nicholas/projects/shopspy-server/node_modules/redis-parser/lib/parser.js:302:14)

    Description

    I am running a Nestjs server with the Bull module to handle the job queue with a Redis database. However, sometimes when I start the server, I get this error. It happens radomly so there is no consistency. The only way I found to solve it is to restart redis-server, but this is not the ideal solution. My redis server uses factory settings except for a password and replication is disabled.

    Bull version

    "@nestjs/bull": "^0.6.2"

    opened by NicholasMartens 0
  • Limit the maximum number of job logs

    Limit the maximum number of job logs

    Description

    I want to add logs to a repeatable job. Then I found that there is no way to limit the number of logs. I'm concerned that this will take up too much memory. I don't know if my worry is superfluous 😂

    Minimal, Working Test code to reproduce the issue.

    (An easy to reproduce test case will dramatically decrease the resolution time.)

    Bull version

    v4.10.1

    Additional information

    opened by CodyTseng 0
  • Cannot get repeat count of job on typescript

    Cannot get repeat count of job on typescript

    Description

    I'm trying to handle jobs depends on repeat count and the job types doesn't seems to have count property.

    It's possible to access by job.opts.repeat['count'] but I'm not sure if this is recommended.

    Why is this missing from the type?

    Minimal, Working Test code to reproduce the issue.

    (An easy to reproduce test case will dramatically decrease the resolution time.)

    Bull version

    4.10.0

    Additional information

    opened by sangwoo-seo 2
Releases(v4.10.2)
Owner
null
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.

Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, dis

Conveyor MQ 45 Dec 15, 2022
Nodejs Background jobs using redis.

node-resque: The best background jobs in node. Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 an

Actionhero 1.2k Jan 3, 2023
Better Queue for NodeJS

Better Queue - Powerful flow control Super simple to use Better Queue is designed to be simple to set up but still let you do complex things. Persiste

Diamond 415 Dec 17, 2022
Job queues and scheduled jobs for Node.js, Beanstalkd and/or Iron.io.

Ironium Job queues and scheduled jobs for Node.js backed by Beanstalk/IronMQ/SQS. The Why You've got a workload that runs outside the Web app's reques

Assaf Arkin 71 Dec 14, 2022
Challenge [Frontend Mentor] - In this challenge, JavaScript was used to filter jobs based on the selected categories. Technologies used: HTML5, CSS3 and React.

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

Rui Neto 11 Apr 13, 2022
Cloud Run Jobs Demos - A collection of samples to show you how and when to run a container to completion without a server

Cloud Run Jobs Demo Applications Cloud Run Jobs allows you to run a container to completion without a server. This repository contains a collection of

Google Cloud Platform 34 Dec 23, 2022
Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime

WORKHORSE UI Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime. Start local Run npm i Copy and name prox

Workhorse 2 Apr 15, 2022
Hello Jobs is a one-stop solution for all job seekers. In future, this could also serve as a platform for recruiters to hire potential candidates.

Hello Jobs Hello Jobs is a one-stop solution for all job seekers. In future, this could also serve as a platform for recruiters to hire potential cand

S Harshita 6 Dec 26, 2022
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
Opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects.

qewe qewe is an opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects. Installation Add qewe to your p

Jamie McElwain 2 Jan 10, 2022
A document based messaging queue for Mongo, DocumentDB, and others

DocMQ Messaging Queue for any document-friendly architectures (DocumentDB, Mongo, Postgres + JSONB, etc). Why Choose This DocMQ is a good choice if yo

Jakob Heuser 10 Dec 7, 2022
Kue is a priority job queue backed by redis, built for node.js.

Kue Kue is no longer maintained Please see e.g. Bull as an alternative. Thank you! Kue is a priority job queue backed by redis, built for node.js. PRO

Automattic 9.4k Dec 20, 2022
A simple, fast, robust job/task queue for Node.js, backed by Redis.

A simple, fast, robust job/task queue for Node.js, backed by Redis. Simple: ~1000 LOC, and minimal dependencies. Fast: maximizes throughput by minimiz

Bee Queue 3.1k Jan 5, 2023
Redis Simple Message Queue

Redis Simple Message Queue A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server. tl;dr: If you run a R

Patrick Liess 1.6k Dec 27, 2022
A simple high-performance Redis message queue for Node.js.

RedisSMQ - Yet another simple Redis message queue A simple high-performance Redis message queue for Node.js. For more details about RedisSMQ design se

null 501 Dec 30, 2022
Yet another concurrent priority task queue, yay!

YQueue Yet another concurrent priority task queue, yay! Install npm install yqueue Features Concurrency control Prioritized tasks Error handling for b

null 6 Apr 4, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 6 Nov 22, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 4 Jul 5, 2022
A tool library for handling window && iframe && worker communication based on the JSON RPC specification

rpc-shooter A tool library for handling window && iframe && worker communication based on the JSON RPC specification 一个基于 JSON-RPC 规范用于处理 window && if

臼犀 89 Dec 20, 2022