BullMQ - Premium Message Queue for NodeJS based on Redis

Related tags

Job Queues bullmq
Overview



The fastest, most reliable, Redis-based distributed queue for Node.
Carefully written for rock solid stability and atomicity.

Read the documentation

Follow @manast for Bull news and updates!

NEW! Tutorials

You can find tutorials and news in this blog: https://blog.taskforce.sh/

Official FrontEnd

Taskforce.sh, Inc

Super charge your queues with a profesional front end and optional Redis hosting:

  • Get a complete overview of all your queues.
  • Inspect jobs, search, retry, or promote delayed jobs.
  • Metrics and statistics.
  • and many more features.

Sign up at Taskforce.sh

The gist

Install:

$ yarn add bullmq

Add jobs to the queue:

import { Queue } from 'bullmq';

const queue = new Queue('Paint');

queue.add('cars', { color: 'blue' });

Process the jobs in your workers:

import { Worker } from 'bullmq';

const worker = new Worker('Paint', async job => {
  if (job.name === 'cars') {
    await paintCar(job.data.color);
  }
});

Listen to jobs for completion:

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('Paint');

queueEvents.on('completed', jobId => {
  console.log('done painting');
});

queueEvents.on('failed', (jobId, err) => {
  console.error('error painting', err);
});

This is just scratching the surface, check all the features and more in the official documentation

Thanks

Thanks for all the contributors that made this library possible, also a special mention to Leon van Kammen that kindly donated his npm bullmq repo.

Comments
  • Queue stuck with jobs in

    Queue stuck with jobs in "active" state for hours, and rest of jobs not being processed

    Hello, we are finding that one of our queues is periodically getting stuck, with 2 jobs sitting in "active" and not being progressed for 18 hours, while the "waiting" queue is filling up with 1000's of jobs as a result.

    The jobs failed with an error, but are not being removed from the "active" state. What can be done to address this? The queue should not get stuck as a result requiring manual intervention, as this is making bullmq completely unusable for us.

    Please let me know what further information you need to help analyse this. Please don't ask for a "script to reproduce" because we don't have one. It's a complicated app with dozens of different jobs, and the jobs that are failing are not always failing, just occasionally when Chrome fails to load on the instance.

    Thanks.

    cannot reproduce 
    opened by adamreisnz 82
  • How to use moveToFailed and moveToCompleted

    How to use moveToFailed and moveToCompleted

    Issuehunt badges

    These two functions are different from v3 and I just don't know what to set for the token.

    job.moveToFailed(new Error('failed message'), ???);


    IssueHunt Summary

    manast manast has been rewarded.

    Backers (Total: $40.00)

    Submitted pull Requests


    Tips

    released PR Request :gift: Rewarded on Issuehunt 
    opened by rileyai-dev 60
  • Vercel add Job : TypeError: client.addJob is not a function

    Vercel add Job : TypeError: client.addJob is not a function

    Thanks for developing this open source. I really enjoy it. I have encountered this problem when running on vercel serverless. The source code works perfectly fine on local.

    {
      "errorType": "Runtime.UnhandledPromiseRejection",
      "errorMessage": "TypeError: client.addJob is not a function",
      "reason": {
        "errorType": "TypeError",
        "errorMessage": "client.addJob is not a function",
        "stack": [
          "TypeError: client.addJob is not a function",
          " at Function.addJob (/var/task/node_modules/bullmq/dist/classes/scripts.js:52:23)",
          " at Job.addJob (/var/task/node_modules/bullmq/dist/classes/job.js:352:34)",
          " at Function.create (/var/task/node_modules/bullmq/dist/classes/job.js:34:28)",
          " at processTicksAndRejections (internal/process/task_queues.js:93:5)",
          " at Queue.add (/var/task/node_modules/bullmq/dist/classes/queue.js:38:25)"
        ]
      },
      "promise": {},
      "stack": [
        "Runtime.UnhandledPromiseRejection: TypeError: client.addJob is not a function",
        " at process.<anonymous> (/var/runtime/index.js:35:15)",
        " at process.emit (events.js:327:22)",
        " at process.emit (/var/task/___vc_sourcemap_support.js:587:21)",
        " at processPromiseRejections (internal/process/promises.js:245:33)",
        " at processTicksAndRejections (internal/process/task_queues.js:94:32)"
      ]
    }
    

    Example Code:

    const connection = new IORedis(process.env.REDIS_URI);
    const queue = new Queue('SENDEMAIL', { connection });
    const job = await queue.add('SEND',  { email });
    

    I found a workaround for this while customizing the vercel.json file. Hope to be able to help anyone who is having this problem.

        "functions": {
            "api/serverless.ts": {
                "includeFiles": "node_modules/bullmq/dist/commands/**",
            }
        },
    
    released 
    opened by naicoi2407 32
  • Recommended Express integration

    Recommended Express integration

    We have a relatively straightforward integration of this library on Heroku (that generally follows this guide):

    1. worker process – small Node.js process that instantiates a Worker instance, leveraging throng / clustering
    2. web process – Express server instantiates a singleton ioredis connection and Queue instance; in practice, some of our Express routes add a job to the queue

    We're instantiating the ioredis connection like so:

    new IORedis(redisUrl, {
      maxRetriesPerRequest: null,
      enableReadyCheck: false,
    });
    

    per the recommendation added in https://github.com/taskforcesh/bullmq/pull/836.

    Is this the recommended Express setup? We're specifically concerned that disabling maxRetriesPerRequest will pose issues in our web context where ideally the number of retries would be finite (per ioredis' default) and commands would "fail fast" to deliver a timely HTTP response.

    bug 
    opened by namoscato 27
  • bullmq@1.56.0: SyntaxError: Named export 'Queue' not found

    [email protected]: SyntaxError: Named export 'Queue' not found

    Upgrading BullMQ from 1.55.1 to 1.16.0 results in this error:

    import { Queue } from 'bullmq'
             ^^^^^
    SyntaxError: Named export 'Queue' not found. The requested module 'bullmq' is a CommonJS module, which may not support all module.exports as named exports.
    CommonJS modules can always be imported via the default export, for example using:
    
    import pkg from 'bullmq';
    const { Queue } = pkg;
    

    Changing my code to this worked indeed, but it's not ideal:

    import bull from 'bullmq'
    const { Queue } = bull;
    

    I can't find which commit caused it, but it would be nice to have this fixed.

    opened by Niek 27
  • Stalled job not requeued

    Stalled job not requeued

    I had a job that reported as "Stalled". In redis there was a lock, an entry in the stalled key. Nothing seemed to be wrong, but the stalled job did not reprocess and the subsequent jobs did not process either.

    the worker is a sandboxed worker, the original work process did not break or have an error from what i can see.

    I have a queue class that automatically instantiates queue, the worker shares a connection with the queue. then i have a queue scheduler and queue events all with their own connection.

    any ideas?

    not sure if this makes a difference but i do make use of job.waitUntilFinished(this.queueEvents)

    opened by spearmootz 27
  • Too many workers? Waiting jobs building up.

    Too many workers? Waiting jobs building up.

    Bullmq 1.8.7 Redis: elasticache Nodejs 12.18.3

    If I have more workers than processes on an instance I'm wondering if this will this cause any issues? What happens in this situation?

    I just added about 5 additional queues (starting from 14) and workers and I believe it has caused caused an issue where we are building up waiting events and all the queues seem blocked. I manually retriggered one job from arena gui and it appeared to go active then bounce back to waiting immidiately.

    I removed the additional queues & related workers/schedulers and it immediately completed the jobs very quickly so it appears as if we've hit some limit but I'm unsure what is most likely to be blocking.

    Could this be a bug in our new workers? We don't have a lot of throughput so I wouldnt have thought it would have completely blocked every queue but there is probably something I'm not understanding.

    opened by Adam-Burke 24
  • Connection is closed

    Connection is closed

    I am getting the following error after upgrading to 1.40.1

    | Error: Connection is closed. | at Redis.endHandler (/var/www/api/rest-server/node_modules/bullmq/dist/classes/redis-connection.js:53:24) | at Object.onceWrapper (events.js:422:26) | at Redis.emit (events.js:315:20) | at processTicksAndRejections (internal/process/task_queues.js:75:11)

    this happens when shutting down the server where my code was trying to close bullmq connections with code below

    queueScheduler && await queueScheduler.close(); queue.getTaskQueue() && await queue.getTaskQueue().close(); worker && await worker.close();

    released 
    opened by zwjohn 23
  • timeout still working?

    timeout still working?

    In bull3, there's a job option (timeout)

    In bullmq4, it is commented out in worker.ts:

     if (timeoutMs) {
            jobPromise = jobPromise.timeout(timeoutMs);
          }
    

    is the feature still working?

    enhancement PRO 
    opened by wenq1 22
  • Connection to Redis

    Connection to Redis

    bullmq: v1.0.1

    I'm not sure how to pass the redis connection. I tried different opts like from the doc (https://docs.bullmq.io/guide/connections) but it either crash with error or connects to 127.0.0.1 am I doing something wrong ?

    const IORedis = require('ioredis');
    
    const queue = new Queue('myqueue', new IORedis(process.env.REDIS_URL));
    const queueEvents = new QueueEvents('myqueueevents', new IORedis(process.env.REDIS_URL));
    
    const worker = new Worker('myworker', async job => { ... }, new IORedis(process.env.REDIS_URL));
    
    question 
    opened by root-io 21
  • Add methods to track related job dependencies

    Add methods to track related job dependencies

    This PR tries to add a method to link job dependencies. Currently I have some jobs that call other jobs, so it would be helpful to have a way to track those dependencies. Also I add a new option when creating a Job, to pass dependencies information. Here are the new methods:

    • addDependency: this method adds a job as a dependency of another job, also it generates a dependent record for the parent.
    • removeDependency: this method could be useful for who wants to delete a dependency relationship, maybe because a dependency is completed or if you just want to delete it.
    • dependenciesReady: this method helps you to track if your dependencies are in completed state or not (if all dependencies are completed, this function returns false, on the other hand true).

    Extention:

    • create: now you can pass dependencies and depends option, it will generate the relationships in the creation flow of those jobs.
    opened by roggervalf 20
  • BullMQ EventEmitter Memory Leak

    BullMQ EventEmitter Memory Leak

    Description

    Minimal, Working Test code to reproduce the issue.

    (An easy to reproduce test case will dramatically decrease the resolution time.)

    Upon adding and removing about 6 jobs (with a 3 minute gap between addition and removal), I get 2 warnings - with the same message (node:18665) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 error listeners added to [Commander]. Use emitter.setMaxListeners() to increase limit.

    image

    Bull version

    bullmq - ^1.86.4

    Additional information

    opened by Dragonizedpizza 0
  • chore(deps): bump json5 from 1.0.1 to 1.0.2

    chore(deps): bump json5 from 1.0.1 to 1.0.2

    Bumps json5 from 1.0.1 to 1.0.2.

    Release notes

    Sourced from json5's releases.

    v1.0.2

    • Fix: Properties with the name __proto__ are added to objects and arrays. (#199) This also fixes a prototype pollution vulnerability reported by Jonathan Gregson! (#295). This has been backported to v1. (#298)
    Changelog

    Sourced from json5's changelog.

    Unreleased [code, diff]

    v2.2.3 [code, diff]

    v2.2.2 [code, diff]

    • Fix: Properties with the name __proto__ are added to objects and arrays. (#199) This also fixes a prototype pollution vulnerability reported by Jonathan Gregson! (#295).

    v2.2.1 [code, diff]

    • Fix: Removed dependence on minimist to patch CVE-2021-44906. (#266)

    v2.2.0 [code, diff]

    • New: Accurate and documented TypeScript declarations are now included. There is no need to install @types/json5. (#236, #244)

    v2.1.3 [code, diff]

    • Fix: An out of memory bug when parsing numbers has been fixed. (#228, #229)

    v2.1.2 [code, diff]

    ... (truncated)

    Commits

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    dependencies 
    opened by dependabot[bot] 0
  • Question: use of QueueEvents callbacks across multiple workers

    Question: use of QueueEvents callbacks across multiple workers

    We'd like to capture and process the stalled event from QueueEvents, but it is unclear whether it would make more sense to have 1 or 2 dedicated instances to register/execute the callbacks, or if there are no concerns about that on a performance/overhead standpoint for the workers.

    For example, for the QueueScheduler documentation, there's indication about it:

    It is ok to have as many QueueScheduler instances as you want, just keep in mind that every instance will perform some bookkeeping so it may create some noticeable CPU and IO usage in your Redis instances.

    But again, I'd like to hear from the team if there are recommendations around this use case.

    Thanks in advance to whoever wants to chime in here!

    opened by liquid1982 0
  • repeat with immediately:true at second time not work

    repeat with immediately:true at second time not work

    Code with below, first time will run immediately. but restart the server, add the same job to queue, no more effect with second time run:

    const jobs = await queue.getRepeatableJobs();
    // remove repeat jobs
    await Promise.all(
      jobs.map(i => {
        return queue.removeRepeatableByKey(i.key);
      }),
    );
    await queue.add('task-name', {}, {
     repeat: {
          every: 1000 * 60 * 60 * 1.6,
          immediately: true,
        },
    });
    
    opened by suhaotian 0
Releases(v3.5.2)
Owner
Taskforce.sh Inc.
Taskforce.sh Inc.
A simple high-performance Redis message queue for Node.js.

RedisSMQ - Yet another simple Redis message queue A simple high-performance Redis message queue for Node.js. For more details about RedisSMQ design se

null 501 Dec 30, 2022
Premium Queue package for handling distributed jobs and messages in NodeJS.

The fastest, most reliable, Redis-based queue for Node. Carefully written for rock solid stability and atomicity. Sponsors · Features · UIs · Install

null 13.5k Dec 31, 2022
A Remix.run stack to monitor your BullMQ queues

Remix Matador stack A bold interface that helps you monitor your BullMQ queues. Learn more about Remix Stacks. $ npx create-remix@latest --template nu

Andrea 19 Dec 15, 2022
Kue is a priority job queue backed by redis, built for node.js.

Kue Kue is no longer maintained Please see e.g. Bull as an alternative. Thank you! Kue is a priority job queue backed by redis, built for node.js. PRO

Automattic 9.4k Dec 20, 2022
A simple, fast, robust job/task queue for Node.js, backed by Redis.

A simple, fast, robust job/task queue for Node.js, backed by Redis. Simple: ~1000 LOC, and minimal dependencies. Fast: maximizes throughput by minimiz

Bee Queue 3.1k Jan 5, 2023
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.

Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, dis

Conveyor MQ 45 Dec 15, 2022
Better Queue for NodeJS

Better Queue - Powerful flow control Super simple to use Better Queue is designed to be simple to set up but still let you do complex things. Persiste

Diamond 415 Dec 17, 2022
Nodejs Background jobs using redis.

node-resque: The best background jobs in node. Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 an

Actionhero 1.2k Jan 3, 2023
A document based messaging queue for Mongo, DocumentDB, and others

DocMQ Messaging Queue for any document-friendly architectures (DocumentDB, Mongo, Postgres + JSONB, etc). Why Choose This DocMQ is a good choice if yo

Jakob Heuser 10 Dec 7, 2022
A general-purpose message and event queuing library for MongoDB

MongoMQ2 MongoMQ2 is a light-weight Node.js library that turns MongoDB collections into general-purpose message queues or event logs, without addition

Morris Brodersen 11 Dec 28, 2022
Yet another concurrent priority task queue, yay!

YQueue Yet another concurrent priority task queue, yay! Install npm install yqueue Features Concurrency control Prioritized tasks Error handling for b

null 6 Apr 4, 2022
Opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects.

qewe qewe is an opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects. Installation Add qewe to your p

Jamie McElwain 2 Jan 10, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 6 Nov 22, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 4 Jul 5, 2022
egg.js(jwt) + mysql(sequelize) + redis + docker + docker-compose + nginx + vue + element-ui 全栈获取省市区数据(统计局数据)【工具】项目,实现在docker环境中一键部署

Egg-spider Preview 线上预览地址 (https://ronaldoxzb.com/) admin admin Project description [后端]egg.js(jwt) + mysql(sequelize) + redis + docker + docker-compo

null 11 Sep 29, 2022
Serverless URL Shortener made with Next.js + Redis.

linki: a place for your links linki is a url shortener made with next.js and redis! built with simplicity in mind, it's all in one page. deploy your o

Jack Reiker 12 Sep 15, 2022
A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set

apiban-redis A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set. Installation This utility can be run as a

jambonz 4 Apr 5, 2022
Serve read-only Redis data over a HTTP API with auth

Redis data exposer This was created for Cliptok and not intended for use outside of it. Use at your own peril. This application will serve an API that

Erisa A 10 May 28, 2022