A simple, fast, robust job/task queue for Node.js, backed by Redis.

Overview

bee-queue logo npm Version Node.js CI Coverage Status

A simple, fast, robust job/task queue for Node.js, backed by Redis.

  • Simple: ~1000 LOC, and minimal dependencies.
  • Fast: maximizes throughput by minimizing Redis and network overhead. Benchmarks well.
  • Robust: designed with concurrency, atomicity, and failure in mind; full code coverage.
const Queue = require('bee-queue');
const queue = new Queue('example');

const job = queue.createJob({x: 2, y: 3});
job.save();
job.on('succeeded', (result) => {
  console.log(`Received result for job ${job.id}: ${result}`);
});

// Process jobs from as many servers or processes as you like
queue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});

Introduction

Bee-Queue is meant to power a distributed worker pool and was built with short, real-time jobs in mind. A web server can enqueue a job, wait for a worker process to complete it, and return its results within an HTTP request. Scaling is as simple as running more workers.

Thanks to the folks at Mixmax, Bee-Queue is once again being regularly maintained!

Celery, Resque, Kue, and Bull operate similarly, but are generally designed for longer background jobs, supporting things like job prioritization and repeatable jobs, which Bee-Queue currently does not. Bee-Queue can handle longer background jobs just fine, but they aren't the primary focus.

  • Create, save, and process jobs
  • Concurrent processing
  • Job timeouts, retries, and retry strategies
  • Scheduled jobs
  • Pass events via Pub/Sub
    • Progress reporting
    • Send job results back to producers
  • Robust design
    • Strives for all atomic operations
    • Retries stuck jobs for at-least-once delivery
    • High code coverage
  • Performance-focused
  • Fully callback- and Promise-compatible API

Installation

$ npm install bee-queue

You'll also need Redis 2.8+* running somewhere.

* We've been noticing that some jobs get delayed by virtue of an issue with Redis < 3.2, and therefore recommend the use of Redis 3.2+.

Table of Contents

Motivation

Celery is for Python, and Resque is for Ruby, but Kue and Bull already exist for Node, and they're good at what they do, so why does Bee-Queue also need to exist?

In short: we needed to mix and match things that Kue does well with things that Bull does well, and we needed to squeeze out more performance. There's also a long version with more details.

Bee-Queue starts by combining Bull's simplicity and robustness with Kue's ability to send events back to job creators, then focuses heavily on minimizing overhead, and finishes by being strict about code quality and testing. It compromises on breadth of features, so there are certainly cases where Kue or Bull might be preferable (see Contributing).

Bull and Kue do things really well and deserve a lot of credit. Bee-Queue borrows ideas from both, and Bull was an especially invaluable reference during initial development.

Why Bees?

Bee-Queue is like a bee because it:

  • is small and simple
  • is fast (bees can fly 20mph!)
  • carries pollen (messages) between flowers (servers)
  • something something "worker bees"

Benchmarks

benchmark chart

These basic benchmarks ran 10,000 jobs through each library, at varying levels of concurrency, with Node.js (v6.9.1, v6.11.2, v7.6.0, v7.10.1, v8.2.1, v8.3.0) and Redis (v3.2.10, v4.0.1) running directly on an Amazon AWS EC2 m4.large. The numbers shown are averages of 36 runs, 3 for each combination of the aforementioned Redis and Node versions. The raw data collected and code used are available in the benchmark folder.

Web Interface

Check out the Arena web interface to manage jobs and inspect queue health.

Overview

Creating Queues

Queue objects are the starting point to everything this library does. To make one, we just need to give it a name, typically indicating the sort of job it will process:

const Queue = require('bee-queue');
const addQueue = new Queue('addition');

Queues are very lightweight — the only significant overhead is connecting to Redis — so if you need to handle different types of jobs, just instantiate a queue for each:

const subQueue = new Queue('subtraction', {
  redis: {
    host: 'somewhereElse',
  },
  isWorker: false,
});

Here, we pass a settings object to specify an alternate Redis host and to indicate that this queue will only add jobs (not process them). See Queue Settings for more options.

Creating Jobs

Jobs are created using Queue.createJob(data), which returns a Job object storing arbitrary data.

Jobs have a chaining API for configuring the Job, and .save([cb]) method to save the job into Redis and enqueue it for processing:

const job = addQueue.createJob({x: 2, y: 3});
job
  .timeout(3000)
  .retries(2)
  .save()
  .then((job) => {
    // job enqueued, job.id populated
  });

The Job's save method returns a Promise in addition to calling the optional callback.

Each Job can be configured with the commands .setId(id), .retries(n), .backoff(strategy, delayFactor), .delayUntil(date|timestamp), and .timeout(ms) for setting options.

Jobs can later be retrieved from Redis using Queue#getJob, but most use cases won't need this, and can instead use Job and Queue Events.

Advanced: Bulk-Creating Jobs

Normally, creating and saving jobs blocks the underlying redis client for the full duration of an RTT to the Redis server. This can reduce throughput in cases where many operations should occur without delay - particularly when there are many jobs that need to be created quickly. Use Queue#saveAll to save an iterable (e.g. an Array) containing jobs in a pipelined network request, thus pushing all the work out on the wire before hearing back from the Redis server.

addQueue
  .saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})])
  .then((errors) => {
    // The errors value is a Map associating Jobs with Errors. This will often be an empty Map.
  });

Each job in the array provided to saveAll will be mutated with the ID it gets assigned.

Processing Jobs

To start processing jobs, call Queue.process and provide a handler function:

addQueue.process(function (job, done) {
  console.log(`Processing job ${job.id}`);
  return done(null, job.data.x + job.data.y);
});

Instead of calling the provided callback, the handler function can return a Promise. This enables the intuitive use of async/await:

addQueue.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  return job.data.x + job.data.y;
});

The handler function is given the job it needs to process, including job.data from when the job was created. It should then pass results either by returning a Promise or by calling the done callback. For more on handlers, see Queue#process.

.process can only be called once per Queue instance, but we can process on as many instances as we like, spanning multiple processes or servers, as long as they all connect to the same Redis instance. From this, we can easily make a worker pool of machines who all run the same code and spend their lives processing our jobs, no matter where those jobs are created.

.process can also take a concurrency parameter. If your jobs spend most of their time just waiting on external resources, you might want each processor instance to handle at most 10 at a time:

const baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
  http.get(`${baseUrl}${job.data.x}-${job.data.y}`, function (res) {
    // parse the difference out of the response...
    return done(null, difference);
  });
});

Progress Reporting

Handlers can send progress reports, which will be received as events on the original job instance:

const job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', (progress) => {
  console.log(
    `Job ${job.id} reported progress: page ${progress.page} / ${progress.totalPages}`
  );
});

addQueue.process(async (job) => {
  // do some work
  job.reportProgress({page: 3, totalPages: 11});
  // do more work
  job.reportProgress({page: 9, totalPages: 11});
  // do the rest
});

Just like .process, these progress events work across multiple processes or servers; the job instance will receive the progress event no matter where processing happens. The data passed through can be any JSON-serializable value. Note that this mechanism depends on Pub/Sub, and thus will incur additional overhead for each additional worker node.

Job and Queue Events

There are three classes of events emitted by Bee-Queue objects: Queue Local events, Queue PubSub events, and Job events. The linked API Reference sections provide a more complete overview of each.

Progress reporting, demonstrated above, happens via Job events. Jobs also emit succeeded events, which we've seen in the opening example, and failed and retrying events.

Queue PubSub events correspond directly to Job events: job succeeded, job retrying, job failed, and job progress. These events fire from all queue instances and for all jobs on the queue.

Queue local events include ready and error on all queue instances, and succeeded, retrying, and failed on worker queues corresponding to the PubSub events being sent out.

Note that Job events become unreliable across process restarts, since the queue's reference to the associated job object will be lost. Queue-level events are thus potentially more reliable, but Job events are more convenient in places like HTTP requests where a process restart loses state anyway.

Stalling Jobs

Bee-Queue attempts to provide "at least once delivery". Any job enqueued should be processed at least once - and if a worker crashes, gets disconnected, or otherwise fails to confirm completion of the job, the job will be dispatched to another worker for processing.

To make this happen, workers periodically phone home to Redis about each job they're working on, just to say "I'm still working on this and I haven't stalled, so you don't need to retry it." The checkStalledJobs method finds any active jobs whose workers have gone silent (not phoned home for at least stallInterval ms), assumes they have stalled, emits a stalled event with the job id, and re-enqueues them to be picked up by another worker.

Optimizing Redis Connections

By default, every time you create a queue instance with new Queue() a new redis connection will be created. If you have a small number of queues accross a large number of servers this will probably be fine. If you have a large number of queues with a small number of servers, this will probably be fine too. If your deployment gets a bit larger you will likely need to optimize the Redis connections.

Let's say for example you have a web application with 30 producer queues and you run 10 webservers & 10 worker servers, each one with 4 processes/server. With the default settings this is going to add up to a lot of Redis connections. Each Redis connection consumes a fairly large chunk of memory, and it adds up quickly!

The producer queues are the ones that run on the webserver and they push jobs into the queue. These queues do not need to receive events so they can all share one redis connection by passing in an instance of node_redis RedisClient.

Example:

// producer queues running on the web server
const Queue = require('bee-queue');
const redis = require('redis');
const sharedConfig = {
  getEvents: false,
  isWorker: false,
  redis: redis.createClient(process.env.REDIS_URL),
};

const emailQueue = new Queue('EMAIL_DELIVERY', sharedConfig);
const facebookUpdateQueue = new Queue('FACEBOOK_UPDATE', sharedConfig);

emailQueue.createJob({});
facebookUpdateQueue.createJob({});

Note that these "producer queues" above are only relevant for the processes that have to put jobs into the queue, not for the workers that need to actually process the jobs.

In your worker process where you define how to process the job with queue.process you will have to run "worker queues" instead of "producer queues". In the example below, even though you are passing in the shared config with the same redis instance, because this is a worker queue Bee-Queue will duplicate() the client because it needs the blocking commands for PubSub subscriptions. This will result in a new connection for each queue.

// worker queues running on the worker server
const Queue = require('bee-queue');
const redis = require('redis');
const sharedConfig = {
  redis: redis.createClient(process.env.REDIS_URL),
};

const emailQueue = new Queue('EMAIL_DELIVERY', sharedConfig);
const facebookUpdateQueue = new Queue('FACEBOOK_UPDATE', sharedConfig);

emailQueue.process((job) => {});
facebookUpdateQueue.process((job) => {});

For a more detailed example and explanation see #96

API Reference

Queue

Settings

The default Queue settings are:

const queue = new Queue('test', {
  prefix: 'bq',
  stallInterval: 5000,
  nearTermWindow: 1200000,
  delayedDebounce: 1000,
  redis: {
    host: '127.0.0.1',
    port: 6379,
    db: 0,
    options: {},
  },
  isWorker: true,
  getEvents: true,
  sendEvents: true,
  storeJobs: true,
  ensureScripts: true,
  activateDelayedJobs: false,
  removeOnSuccess: false,
  removeOnFailure: false,
  redisScanCount: 100,
});

The settings fields are:

  • prefix: string, default bq. Useful if the bq: namespace is, for whatever reason, unavailable or problematic on your redis instance.

  • stallInterval: number, ms; the length of the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried. A higher value will also result in a lower probability of false-positives during stall detection.

  • nearTermWindow: number, ms; the window during which delayed jobs will be specifically scheduled using setTimeout - if all delayed jobs are further out that this window, the Queue will double-check that it hasn't missed any jobs after the window elapses.

  • delayedDebounce: number, ms; to avoid unnecessary churn for several jobs in short succession, the Queue may delay individual jobs by up to this amount.

  • redis: object or string, specifies how to connect to Redis. See redis.createClient() for the full set of options.

    • host: string, Redis host.
    • port: number, Redis port.
    • socket: string, Redis socket to be used instead of a host and port.

    Note that this can also be a node_redis RedisClient instance, in which case Bee-Queue will issue normal commands over it. It will duplicate() the client for blocking commands and PubSub subscriptions, if enabled. This is advanced usage,

  • isWorker: boolean. Disable if this queue will not process jobs.

  • getEvents: boolean. Disable if this queue does not need to receive job events.

  • sendEvents: boolean. Disable if this worker does not need to send job events back to other queues.

  • storeJobs: boolean. Disable if this worker does not need to associate events with specific Job instances. This normally improves memory usage, as the storage of jobs is unnecessary for many use-cases.

  • ensureScripts: boolean. Ensure that the Lua scripts exist in redis before running any commands against redis.

  • activateDelayedJobs: boolean. Activate delayed jobs once they've passed their delayUntil timestamp. Note that this must be enabled on at least one Queue instance for the delayed retry strategies (fixed and exponential) - this will reactivate them after their computed delay.

  • removeOnSuccess: boolean. Enable to have this worker automatically remove its successfully completed jobs from Redis, so as to keep memory usage down.

  • removeOnFailure: boolean. Enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.

  • quitCommandClient: boolean. Whether to QUIT the redis command client (the client it sends normal operations over) when Queue#close is called. This defaults to true for normal usage, and false if an existing RedisClient object was provided to the redis option.

  • redisScanCount: number. For setting the value of the SSCAN Redis command used in Queue#getJobs for succeeded and failed job types.

Properties

  • name: string, the name passed to the constructor.
  • keyPrefix: string, the prefix used for all Redis keys associated with this queue.
  • jobs: a Map associating the currently tracked jobs (when storeJobs and getEvents are enabled).
  • paused: boolean, whether the queue instance is paused. Only true if the queue is in the process of closing.
  • settings: object, the settings determined between those passed and the defaults

Queue Local Events

ready

Instead of listening to this event, consider calling Queue#ready([cb]), which returns a Promise that resolves once the Queue is ready. If the Queue is already ready, then the Promise will be already resolved.

queue.on('ready', () => {
  console.log('queue now ready to start doing things');
});

The queue has connected to Redis and ensured that the Lua scripts are cached. You can often get away without checking for this event, but it's a good idea to wait for it in case the Redis host didn't have the scripts cached beforehand; if you try to enqueue jobs when the scripts are not yet cached, you may run into a Redis error.

error

queue.on('error', (err) => {
  console.log(`A queue error happened: ${err.message}`);
});

Any Redis errors are re-emitted from the Queue. Note that this event will not be emitted for failed jobs.

succeeded

queue.on('succeeded', (job, result) => {
  console.log(`Job ${job.id} succeeded with result: ${result}`);
});

This queue has successfully processed job. If result is defined, the handler called done(null, result).

retrying

queue.on('retrying', (job, err) => {
  console.log(
    `Job ${job.id} failed with error ${err.message} but is being retried!`
  );
});

This queue has processed job, but it reported a failure and has been re-enqueued for another attempt. job.options.retries has been decremented, and the stack trace (or error message) has been added to its job.options.stacktraces array.

failed

queue.on('failed', (job, err) => {
  console.log(`Job ${job.id} failed with error ${err.message}`);
});

This queue has processed job, but its handler reported a failure either by rejecting its returned Promise, or by calling done(err). Note that if you pass an async function to process, you must reject it by returning Promise.reject(...) or throwing an exception (done does not apply).

stalled

queue.on('stalled', (jobId) => {
  console.log(`Job ${jobId} stalled and will be reprocessed`);
});

This queue detected that a job stalled. Note that this might not be the same queue instance that processed the job and ultimately stalled; instead, it's the queue instance that happened to detect the stalled job.

Queue PubSub Events

These events are all reported by some worker queue (with sendEvents enabled) and sent as Redis Pub/Sub messages back to any queues listening for them (with getEvents enabled). This means that listening for these events is effectively a monitor for all activity by all workers on the queue.

If the jobId of an event is for a job that was created by that queue instance, a corresponding job event will be emitted from that job object.

Note that Queue PubSub events pass the jobId, but do not have a reference to the job object, since that job might have originally been created by some other queue in some other process. Job events are emitted only in the process that created the job, and are emitted from the job object itself.

job succeeded

queue.on('job succeeded', (jobId, result) => {
  console.log(`Job ${jobId} succeeded with result: ${result}`);
});

Some worker has successfully processed job jobId. If result is defined, the handler called done(null, result).

job retrying

queue.on('job retrying', (jobId, err) => {
  console.log(
    `Job ${jobId} failed with error ${err.message} but is being retried!`
  );
});

Some worker has processed job jobId, but it reported a failure and has been re-enqueued for another attempt.

job failed

queue.on('job failed', (jobId, err) => {
  console.log(`Job ${jobId} failed with error ${err.message}`);
});

Some worker has processed job, but its handler reported a failure with done(err).

job progress

queue.on('job progress', (jobId, progress) => {
  console.log(`Job ${jobId} reported progress: ${progress}%`);
});

Some worker is processing job jobId, and it sent a progress report of progress percent.

Queue Delayed Job activation

The Queue will activate no delayed jobs unless activateDelayedJobs is set to true.

The promptness of the job activation is controlled with the delayedDebounce setting on the Queue. This setting defines a window across which to group delayed jobs. If three jobs are enqueued for 10s, 10.5s, and 12s in the future, then a delayedDebounce of 1000 will cause the first two jobs to activate when the timestamp of the second job passes.

The nearTermWindow setting on the Queue determines the maximum duration the Queue should wait before attempting to activate any of the elapsed delayed jobs in Redis. This setting is to control for network failures in the delivery of the earlierDelayed event in conjunction with the death of the publishing Queue.

Methods

Queue(name, [settings])

Used to instantiate a new queue; opens connections to Redis.

Queue#createJob(data)

const job = queue.createJob({...});

Returns a new Job object with the associated user data.

Queue#getJob(jobId, [cb])

queue.getJob(3, function (err, job) {
  console.log(`Job 3 has status ${job.status}`);
});

queue.getJob(3).then((job) => console.log(`Job 3 has status ${job.status}`));

Looks up a job by its jobId. The returned job will emit events if getEvents and storeJobs is true.

Be careful with this method; most potential uses would be better served by job events on already-existing job instances. Using this method indiscriminately can lead to increasing memory usage when the storeJobs setting is true, as each queue maintains a table of all associated jobs in order to dispatch events.

Queue#getJobs(type, page, [cb])

queue.getJobs('waiting', {start: 0, end: 25}).then((jobs) => {
  const jobIds = jobs.map((job) => job.id);
  console.log(`Job ids: ${jobIds.join(' ')}`);
});

queue.getJobs('failed', {size: 100}).then((jobs) => {
  const jobIds = jobs.map((job) => job.id);
  console.log(`Job ids: ${jobIds.join(' ')}`);
});

Looks up jobs by their queue type. When looking up jobs of type waiting, active, or delayed, page should be configured with start and end attributes to specify a range of job indices to return. Jobs of type failed and succeeded will return an arbitrary subset of the queue of size page['size']. Note: This is because failed and succeeded job types are represented by a Redis SET, which does not maintain a job ordering.

Note that large values of the attributes of page may cause excess load on the Redis server.

Queue#process([concurrency], handler(job, done))

Begins processing jobs with the provided handler function.

The process method should only be called once, and should never be called on a queue where isWorker is false.

The optional concurrency parameter sets the maximum number of simultaneously active jobs for this processor. It defaults to 1.

The handler function should either:

  • Return a Promise that eventually resolves or rejects, or
  • Call done exactly once
    • Use done(err) to indicate job failure
    • Use done() or done(null, result) to indicate job success
      • result must be JSON-serializable (for JSON.stringify)
  • Never ever block the event loop (for very long). If you do, the stall detection might think the job stalled, when it was really just blocking the event loop.

N.B. If the handler returns a Promise, calls to the done callback will be ignored.

Queue#checkStalledJobs([interval], [cb])

Checks for jobs that appear to be stalling and thus need to be retried, then re-enqueues them.

queue.checkStalledJobs(5000, (err, numStalled) => {
  // prints the number of stalled jobs detected every 5000 ms
  console.log('Checked stalled jobs', numStalled);
});

What happens after the check is determined by the parameters provided:

  • cb only: cb is called
  • interval only: a timeout is set to call the method again in interval ms
  • cb and interval: a timeout is set, then cb is called

Bee-Queue automatically calls this method once when a worker begins processing, so it will check once if a worker process restarts. You should also make your own call with an interval parameter to make the check happen repeatedly over time; see Under the hood for an explanation why.

The maximum delay from when a job stalls until it will be retried is roughly stallInterval + interval, so to minimize that delay without calling checkStalledJobs unnecessarily often, set interval to be the same or a bit shorter than stallInterval. A good system-wide average frequency for the check is every 0.5-10 seconds, depending on how time-sensitive your jobs are in case of failure. Larger deployments, or deployments where processing has higher CPU variance, may need even higher intervals.

Note that for calls that specify an interval, you must provide a callback if you want results from each subsequent check - the returned Promise can and will only resolve for the first check. If and only if you specify an interval and no cb, then errors encountered after the first check will be emitted as error events.

Queue#checkHealth([cb])

Check the "health" of the queue. Returns a promise that resolves to the number of jobs in each state (waiting, active, succeeded, failed, delayed), and the newest job ID (if using the default ID behavior) in newestJob. You can periodically query the newestJob ID to estimate the job creation throughput, and can infer the job processing throughput by incorporating the waiting and active counts.

const counts = await queue.checkHealth();
// print all the job counts
console.log('job state counts:', counts);

Queue#close([timeout], [cb])

Closes the queue's connections to Redis. Idempotent.

The recommended pattern for gracefully shutting down your worker is:

// Some reasonable period of time for all your concurrent jobs to finish
// processing. If a job does not finish processing in this time, it will stall
// and be retried. As such, do attempt to make your jobs idempotent, as you
// generally should with any queue that provides at-least-once delivery.
const TIMEOUT = 30 * 1000;

process.on('uncaughtException', async () => {
  // Queue#close is idempotent - no need to guard against duplicate calls.
  try {
    await queue.close(TIMEOUT);
  } catch (err) {
    console.error('bee-queue failed to shut down gracefully', err);
  }
  process.exit(1);
});

Queue#isRunning()

Returns true unless the Queue is shutting down due to a call to Queue#close().

Queue#ready([cb])

Promise resolves to the queue (or callback is called wth null argument) when the queue (and Redis) are ready for jobs. Learn more about 'ready' in Queue Local Events.

const Queue = require('bee-queue');
const queue = new Queue('example');
queue
  .ready()
  .then(async (queue) => {
    console.log('isRunning:', queue.isRunning());
    const checkHealth = await queue.checkHealth();
    console.log('checkHealth:', checkHealth);
  })
  .catch((err) => console.log('unreadyable', err));

Queue#removeJob(jobId, [cb])

queue.removeJob(3, function (err) {
  if (!err) {
    console.log('Job 3 was removed');
  }
});

queue.removeJob(3).then(() => console.log('Job 3 was removed'));

Removes a job by its jobId. Idempotent.

This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.

Returns the Queue instance it was called on.

Queue#destroy([cb])

queue.destroy(function (err) {
  if (!err) {
    console.log('Queue was destroyed');
  }
});

queue.destroy().then(() => console.log('Queue was destroyed'));

Removes all Redis keys belonging to this queue (see Under the hood). Idempotent.

It goes without saying that this should be used with great care.

Returns the number of keys removed.

Job

Properties

  • id: string, Job ID unique to each job. Not populated until .save calls back. Can be overridden with Job#setId.
  • data: object; user data associated with the job. It should:
    • Be JSON-serializable (for JSON.stringify)
    • Never be used to pass large pieces of data (100kB+)
    • Ideally be as small as possible (1kB or less)
  • options: object used by Bee-Queue to store timeout, retries, stack traces, etc.
    • Do not modify directly; use job methods instead.
  • queue: the Queue responsible for this instance of the job. This is either:
    • the queue that called createJob to make the job,
    • the queue that ran getJob to fetch the job from redis, or
    • the queue that called process to process it
  • progress: number; progress between 0 and 100, as reported by reportProgress.

Job Events

These are all Pub/Sub events like Queue PubSub events and are disabled when getEvents is false.

succeeded

const job = await queue.createJob({...}).save();
job.on('succeeded', (result) => {
  console.log(`Job ${job.id} succeeded with result: ${result}`);
});

The job has succeeded. If result is defined, the handler called done(null, result).

retrying

job.on('retrying', (err) => {
  console.log(
    `Job ${job.id} failed with error ${err.message} but is being retried!`
  );
});

The job has failed, but it is being automatically re-enqueued for another attempt. job.options.retries has been decremented accordingly.

failed

job.on('failed', (err) => {
  console.log(`Job ${job.id} failed with error ${err.message}`);
});

The job has failed, and is not being retried.

progress

job.on('progress', (progress) => {
  console.log(`Job ${job.id} reported progress: ${progress}%`);
});

The job has sent a progress report of progress percent.

Methods

Each Job can be configured with the chainable commands .setId(id), .retries(n), .backoff(strategy, delayFactor), .delayUntil(date|timestamp), and .timeout(ms).

Job#setId(id)

const job = await queue.createJob({...})
  .setId('bulk')
  .save();

Explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and job.id will be set to null. This method can be used to run a job once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user. Furthermore, when this feature is used with queue settings removeOnSuccess: true and removeOnFailure: true, it will allow that job to be re-run again, effectively ensuring that jobId will have a global concurrency of 1.

Avoid passing a numeric job ID, as it may conflict with an auto-generated ID.

Job#retries(n)

const job = await queue.createJob({...})
  .retries(3)
  .save();

Sets how many times the job should be automatically retried in case of failure.

Stored in job.options.retries and decremented each time the job is retried.

Defaults to 0.

Job#backoff(strategy, delayFactor)

// When the job fails, retry it immediately.
const job = queue.createJob({...})
  .backoff('immediate');
// When the job fails, wait the given number of milliseconds before retrying.
job.backoff('fixed', 1000);
// When the job fails, retry using an exponential backoff policy.
// In this example, the first retry will be after one second after completion
// of the first attempt, and the second retry will be two seconds after completion
// of the first retry.
job.backoff('exponential', 1000);

Sets the backoff policy when handling retries.

This setting is stored in job.options.backoff as {strategy, delay}.

Defaults to 'immediate'.

Job#delayUntil(date|timestamp)

const job = await queue.createJob({...})
  .delayUntil(Date.parse('2038-01-19T03:14:08.000Z'))
  .save();

Delay the job until the given Date/timestamp passes. See the Queue settings section for information on controlling the activation of delayed jobs.

Defaults to enqueueing the job for immediate processing.

Job#timeout(ms)

const job = await queue.createJob({...})
  .timeout(10000)
  .save();

Sets a job runtime timeout in milliseconds; if the job's handler function takes longer than the timeout to call done, the worker assumes the job has failed and reports it as such (causing the job to retry if applicable).

Defaults to no timeout.

Job#save([cb])

const job = queue.createJob({...});
job.save((err, job) => {
  console.log(`Saved job ${job.id}`);
});

job.save().then((job) => console.log(`Saved job ${job.id}`));

Saves a job, queueing it up for processing. After the callback fires (and associated Promise resolves), job.id will be populated.

Job#reportProgress(n)

queue.process(async (job, done) => {
  await doSomethingQuick();

  job.reportProgress(10);

  await doSomethingBigger();

  job.reportProgress(50);

  await doFinalizeStep();
});

Reports job progress when called within a handler function. Causes a progress event to be emitted. Does not persist the progress to Redis, but will store it on job.progress, and if other Queues have storeJobs and getEvents enabled, then the progress will end up on all corresponding job instances.

Job#remove([cb])

const job = queue.createJob({...});

// ...

job.remove(function (err) {
  if (!err) {
    console.log('Job was removed');
  }
});

job.remove()
  .then(() => console.log('Job was removed'));

Removes a job from the queue. Idempotent.

This may have unintended side-effect, e.g. if the job is currently being processed by another worker, so only use this method when you know it's safe.

Note that this method will call Queue#removeJob with the job id, so if you don't have the job in memory, but knows its id, it's much more efficient to use Queue#removeJob instead of getting the job first.

Returns the Job instance it was called on.

Defaults

Defaults for Queue settings live in lib/defaults.js. Changing that file will change Bee-Queue's default behavior.

Under the hood

Each Queue uses the following Redis keys:

  • bq:name:id: Integer, incremented to determine the next Job ID.
  • bq:name:jobs: Hash from Job ID to a JSON string containing its data and options.
  • bq:name:waiting: List of IDs of jobs waiting to be processed.
  • bq:name:active: List of IDs jobs currently being processed.
  • bq:name:succeeded: Set of IDs of jobs which succeeded.
  • bq:name:failed: Set of IDs of jobs which failed.
  • bq:name:delayed: Ordered Set of IDs corresponding to delayed jobs - this set maps delayed timestamp to IDs.
  • bq:name:stalling: Set of IDs of jobs which haven't 'checked in' during this interval.
  • bq:name:stallBlock: Set of IDs of jobs which haven't 'checked in' during this interval.
  • bq:name:events: Pub/Sub channel for workers to send out job results.
  • bq:name:earlierDelayed: When a new delayed job is added prior to all other jobs, the script creating the job will publish the job's timestamp over this Pub/Sub channel.

Bee-Queue is non-polling, so idle workers are listening to receive jobs as soon as they're enqueued to Redis. This is powered by brpoplpush, which is used to move jobs from the waiting list to the active list. Bee-Queue generally follows the "Reliable Queue" pattern described here.

The isWorker setting creates an extra Redis connection dedicated to brpoplpush. If either getEvents or activateDelayedJobs are enabled, another connection is dedicated to receiving Pub/Sub events. As such, these settings should be disabled if you don't need them.

The stalling set is a snapshot of the active list from the beginning of the latest stall interval. During each stalling interval, workers remove their job IDs from the stalling set, so at the end of an interval, any jobs whose IDs are left in the stalling set have missed their window (stalled) and need to be rerun. When checkStalledJobs runs, it re-enqueues any jobs left in the stalling set (to the waiting list), then takes a snapshot of the active list and stores it in the stalling set.

Bee-Queue requires the user to start the repeated checks on their own because if we did it automatically, every queue instance in the system would be doing the check. Checking from all instances is less efficient and provides weaker guarantees than just checking from one or two. For example, a checkStalledJobs interval of 5000ms running on 10 processes would average one check every 500ms, but would only guarantee a check every 5000ms. Two instances checking every 1000ms would also average one check every 500ms, but would be more well-distributed across time and would guarantee a check every 1000ms. Though the check is not expensive, and it doesn't hurt to do it extremely often, avoiding needless inefficiency is a main point of this library, so we leave it to the user to control exactly which processes are doing the check and how often.

Contributing

Pull requests are welcome; just make sure npm test passes. For significant changes, open an issue for discussion first.

Some significant non-features include:

  • Worker tracking: Kue does this.
  • All-workers pause-resume: Bull does this.
  • Job priority: multiple queues get the job done in simple cases, but Kue has first-class support. Bull provides a wrapper around multiple queues.

Some of these could be worthwhile additions; please comment if you're interested in using or helping implement them!

You'll need a local redis server to run the tests. Note that running the tests may delete some keys in the form of bq:test-*-*:*.

Comments
  • Rewrite bee-queue 1.0.0

    Rewrite bee-queue 1.0.0

    ~Note that we haven't re-attained 100% code coverage, and that's the biggest blocker to considering this ready to merge.~

    • [x] @randallm Implement or remove start, end parameters for _getJobs
    • [x] @randallm 100% coverage
    • [x] @skeggse Update documentation
    opened by skeggse 41
  • Update the benchmark results and summary

    Update the benchmark results and summary

    • Added a benchmark script (be careful with it).
    • Added a silly analysis script (improvements welcome).
    • Consolidated the actual tests.
    • Published the raw results from July.

    Doesn't quite get us #67, because it looks like my aws-benchmark.sh script didn't actually capture all the Redis stats I wanted.

    opened by skeggse 23
  • Switch from JSON.parse/stringify to msgpack-lite

    Switch from JSON.parse/stringify to msgpack-lite

    updated dependencies: switch to last node redis and last dev tool. switch to msgpack-lite, best performance and memory use (Reduces ~25% the size of job data). fixed some tests for new redis/msgpack and eslint

    opened by v4l3r10 19
  • Error: NOSCRIPT No matching script. Please use EVAL.

    Error: NOSCRIPT No matching script. Please use EVAL.

    I'm opening a new issue - related to #24 b/c 24 is closed, but unresolved, as far as I can tell.

    I'm trying to migrate from kue to bee-queue, and when I start up the app, it throws this error:

    Error: NOSCRIPT No matching script. Please use EVAL.
        at ReplyParser.<anonymous> (/Users/davis/git/warehouse-web/node_modules/bee-queue/node_modules/redis/index.js:320:31)
        at ReplyParser.emit (events.js:107:17)
        at ReplyParser.send_error (/Users/davis/git/warehouse-web/node_modules/bee-queue/node_modules/redis/lib/parser/javascript.js:298:10)
        at ReplyParser.execute (/Users/davis/git/warehouse-web/node_modules/bee-queue/node_modules/redis/lib/parser/javascript.js:183:22)
        at RedisClient.on_data (/Users/davis/git/warehouse-web/node_modules/bee-queue/node_modules/redis/index.js:550:27)
        at Socket.<anonymous> (/Users/davis/git/warehouse-web/node_modules/bee-queue/node_modules/redis/index.js:105:14)
        at Socket.emit (events.js:107:17)
        at readableAddChunk (_stream_readable.js:163:16)
        at Socket.Readable.push (_stream_readable.js:126:10)
        at TCP.onread (net.js:538:20) +2ms
    

    ...which I understand to mean Redis is saying the Lua scripts are not cached, so they'll need to be EVAL'd instead. This should be true since I'm just trying bee-queue for the first time.

    However, I also pulled the proposed fix from #24 commit here

    $npm ls --depth=0 | grep bee
    npm info it worked if it ends with ok
    npm info using [email protected]
    npm info using [email protected]
    ├── [email protected] (git+ssh://[email protected]/mllongze/bee-queue.git#8cefcb25b27c23840ca668578d3ae4bcdf4ebcd3)
    

    And when I run with that, I hit the same error. It does not resolve it for me. I'm running Redis 3.0.3 on localhost for development.

    opened by davisford 18
  • Update type definitions

    Update type definitions

    I've made minimal changes to the type definitions to make it easier to consistently validate the data type that a queue or job is processing.

    cc @pbadenski @martinwepner

    released 
    opened by bondz 14
  • Job progress only updates via messages

    Job progress only updates via messages

    While trying to get the job progress via the Queue.getJob() API, the progress is always 0

    Here is the reproduction: https://tonicdev.com/56dc46f40ba4470d0091e3c2/56dc46f40ba4470d0091e3c3

    (please note that the redis instance connection is through a free service for the reproduction to work, so if you get a connection error just retry to run the provided code)

    perhaps I'm doing something wrong?

    opened by patriceperez 13
  • 2.0 dev (WIP)

    2.0 dev (WIP)

    I have moved everything over to ioredis, torn out all the callback stuff, and gotten all the tests back to passing! It took a while, but it's pretty much the main hurdle/technical risk for making the 2.0 plan work. That's enough to warrant a WIP branch. I will ping people here when it's less WIP and more ready for review.

    Some notes/thoughts:

    • Can't wait to refactor a bunch of promise stuff to async/await now that tests are back to green
    • I really really like ava, and wow the tests run so fast! 3-4 seconds on my laptop.
    • ioredis
      • Their auto-reconnection stuff is great - actually simplified some tests and makes me feel good about trying to test and trust connection re-establishment more
        • keeping an eye on ioredis issue 610 to cover bases/maybe repro potential issues
      • Still lots of little todos to utilize some of their nice features, cluster capabilities, etc
        • also a cool defineCommand capability for lua scripts - could obviate a lot of the load-script-cache stuff we've implemented
      • filed issue 614 on ioredis to resolve a confusion on status/ready detection/timing
    • tearing out callbacks
      • Deleted so many instances of if (cb) helpers.asCallback(promise, cb);
      • new rule that your .process handler should just be an async func is great
    • Moved promise-callbacks down to a devDependency
      • using several of them in tests so kept whole thing as devDependency
      • implementation code is basically only using waitOn and withTimeout though, so plucked those two into helpers file

    Other todos:

    • maybe add debug logs, ioredis's debug logs were v helpful at times
    • figure out where a percent of test coverage went and get it back
    • investigate viability of sharing event subscriber connections, worker connections
    • bunch of readme/example updates to expunge callbacks
    • redo benchmarks
    • probably include some other features in 2.0 for completeness, like pluggable job data serializers and stallLimit

    Refs #104

    opened by LewisJEllis 11
  • use redis 3 for tls

    use redis 3 for tls

    As previously discussed in https://github.com/bee-queue/bee-queue/pull/257 In the interest of security, to utilisize rediss over TLS. I noticed dependabot was also complaining about redis too.

    It seemed like no progress is being made towards bee-queue 2.0 or using IOredis so this is necessary.

    Node 4 support needs to be dropped as a result, but that was EOL in 2018 (Node 8 is also EOL as is 10) https://endoflife.software/programming-languages/server-side-scripting/nodejs

    opened by b-jam 10
  • reusing redisClient fails to create jobs

    reusing redisClient fails to create jobs

    Before anything: thanks for such amazing queue lib.

    I have this issue: when I reference an existing redisClient to the queue redis configuration object like so:

    const redis = require('redis');
    const redisOptions = {
      host: process.env.REDIS_HOST || '127.0.0.1',
      port: process.env.REDIS_PORT || 6379,
      password: 'my-password',
      db: 0,
      options: {},
      retry_strategy(options) {
        return Math.min(options.attempt * 100, 3000);
      },
    };
    
    const redisClient = redis.createClient(redisOptions);
    const queue = new Queue('my-queue', {
        redis: redisClient,
        isWorker: false,
        activateDelayedJobs: true,
        removeOnSuccess: true,
      }).on('ready', () => {
        console.log('ready');
      });
    
    queue.createJob({
      'data': { 'here': 1 }
      })
      .backoff('fixed', 300000)
      .retries(3)
      .setId('my-id')
      .save();
    

    The queue only saves jobs to Redis when I create a brand new connection using the queue connection parameters (but using an existing redisClient doesn't work). My redisClient works, as I can pull data from Redis normally, it's only the queue that is not able to save jobs on Redis when using my own redisClient.

    Using:

    {
      "dependencies": {
        "bee-queue": "^1.2.2",
        "redis": "^2.8.0"
      }
    }
    
    question topic:reconnection 
    opened by pbreah 10
  • 'success' event not fired

    'success' event not fired

    Hey there,

    Thanks for the great library, although unfortunately I have been having a few issues,

    I've been using bull and recently switched to bee queue to make use of the job success event callbacks that mean I can use async.js parallel to set of loads of tasks to be done by workers and wait for them all to complete,

    My problem comes when the queue says that it has "completed" everything

    { waiting: 0,
      active: 0,
      succeeded: 1250,
      failed: 0,
      delayed: 0,
      newestJob: 1250 }
    

    But by my own logging:

    Checked stalled jobs 0
    Total: 1250 jobs
    Outstanding: 10 jobs
    

    10 Jobs didn't fire a callback, which then hangs my program which is waiting for the success callback of each job before it moves on to the next step

    This is probably a scalability issue of where jobs are coming back to the dispatch too quickly to handle, as when jobs have a setTimeout in them of 100ms the queue doesn't hang. But in that case, I would expect for succeeded to be incorrect when calling checkHealth (although I know this is asking Redis instead of the library what it has seen).

    As all jobs have "succeeded" is there a way for me to then recall the data returned by these outstanding jobs? Or is there a way for me to forcefully retry them again?

    For now I'm going to attempt to set a manual timeout to remove and save the job again.

    Thanks for the hard work though, cheers for any help anyone may have for fixing this,

    bug 
    opened by henchmun 10
  • multi-hop / dependent jobs?

    multi-hop / dependent jobs?

    Hey, do you see any potential issue with jobs that have multiple hops / dependencies. For example:

    producer1 --> consumer1 which then creates producer2 --> consumer2

    The producer1 job isn't successful until the whole chain returns successful.

    I also want to make use of retries and timeouts for the upstream job, but not the downstream. I'm looking to get rid of kue...I have had nothing but problems with it, and I'm looking at giving yours a try. I only have one particular job that has a multi-hop like that, but I thought I'd inquire before I spent the time to migrate the code and try it out.

    opened by davisford 10
  • chore(deps-dev): bump semantic-release from 19.0.5 to 20.0.2

    chore(deps-dev): bump semantic-release from 19.0.5 to 20.0.2

    Bumps semantic-release from 19.0.5 to 20.0.2.

    Release notes

    Sourced from semantic-release's releases.

    v20.0.2

    20.0.2 (2023-01-08)

    Bug Fixes

    • deps: update dependency semver-diff to v4 (#1915) (cb45d27)

    v20.0.1

    20.0.1 (2023-01-07)

    Bug Fixes

    • deps: update dependency cosmiconfig to v8 (f914c1e)
    • deps: update dependency hosted-git-info to v6 (c4da008)

    v20.0.0

    BREAKING CHANGES

    • esm: semantic-release is now ESM-only. since it is used through its own executable, the impact on consuming projects should be minimal
    • esm: references to plugin files in configs need to include the file extension because of executing in an ESM context
    • node-versions: node v18 is now the minimum required version of node. this is in line with our node support policy. please see our recommendations for releasing with a different node version than your project normally uses, if necessary.

    Features

    Bug Fixes

    • env-ci: updated to the stable esm-only version (#2632) (918eb59)
    • secrets-masking: used the proper named import from hook-std to enable masking for stderr (#2619) (cf6befa)

    v20.0.0-beta.4

    20.0.0-beta.4 (2022-12-07)

    Bug Fixes

    • env-ci: updated to the stable esm-only version (#2632) (918eb59)

    v20.0.0-beta.3

    20.0.0-beta.3 (2022-11-21)

    Features

    • node-versions: raised the minimum required node version to v18 (#2620) (8a0d8be)

    ... (truncated)

    Commits
    • cb45d27 fix(deps): update dependency semver-diff to v4 (#1915)
    • f914c1e fix(deps): update dependency cosmiconfig to v8
    • c4da008 fix(deps): update dependency hosted-git-info to v6
    • b707475 chore(deps): update dependency sinon to v15 (#2653)
    • b957934 chore(deps): update dependency fs-extra to v11 (#2650)
    • 4b74f00 chore(deps): update dependency ava to v5 (#2649)
    • bbb5e5a chore(deps): update dependency testdouble to v3.16.8 (#2646)
    • a1e0467 chore(deps): update dependency sinon to v14.0.2 (#2645)
    • b9b5c76 Merge pull request #2607 from semantic-release/beta
    • 91bcb6b Merge branch 'master' of github.com:semantic-release/semantic-release into beta
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies 
    opened by dependabot[bot] 1
  • chore(deps-dev): bump prettier from 2.8.1 to 2.8.2

    chore(deps-dev): bump prettier from 2.8.1 to 2.8.2

    Bumps prettier from 2.8.1 to 2.8.2.

    Release notes

    Sourced from prettier's releases.

    2.8.2

    🔗 Changelog

    Changelog

    Sourced from prettier's changelog.

    2.8.2

    diff

    Don't lowercase link references (#13155 by @​DerekNonGeneric & @​fisker)

    <!-- Input -->
    We now don't strictly follow the release notes format suggested by [Keep a Changelog].
    

    <!-- Prettier 2.8.1 --> We now don't strictly follow the release notes format suggested by Keep a Changelog.

    <!-- ^^^^^^^^^^^^^^^^^^ lowercased -->

    <!-- Prettier 2.8.2 --> <Same as input>

    Preserve self-closing tags (#13691 by @​dcyriller)

    {{! Input }}
    <div />
    <div></div>
    <custom-component />
    <custom-component></custom-component>
    <i />
    <i></i>
    <Component />
    <Component></Component>
    

    {{! Prettier 2.8.1 }} <div></div> <div></div> <custom-component></custom-component> <custom-component></custom-component> <i></i> <i></i> <Component /> <Component />

    {{! Prettier 2.8.2 }} </tr></table>

    ... (truncated)

    Commits
    • ac88438 Release 2.8.2
    • aaf9190 Fix comments after directive (#14081)
    • 9e09a78 Stop inserting space in LESS property access (#14103)
    • 0c5d4f3 Fix removing commas from function arguments in maps (#14089)
    • b77d912 ember / glimmer: Preserve self-closing tags (#13691)
    • cf36209 Handlebars: Add tests for {{! prettier-ignore}} (#13693)
    • f8e1ad8 Add parens to head of ExpressionStatement instead of whole statement (#14077)
    • 8034bad Build(deps): Bump json5 from 2.2.0 to 2.2.3 in /scripts/release (#14104)
    • 31d4010 Build(deps): Bump json5 from 2.2.1 to 2.2.3 in /website (#14101)
    • 41cee06 Do not change case of property name if inside a variable declaration in LESS ...
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies 
    opened by dependabot[bot] 1
  • Group concurrency for one queue

    Group concurrency for one queue

    First thanks for this library, it works well in my project! Now i have a problem hope to get help:

    Suppose I have 5 clients and get 20 pieces of data for each one. The data of different clients can be processed at the same time, but for the same client must be processed in order. My current approach is to put these 100 pieces of data into a queue and the concurrency number during processing is 1, so I am trying to improve performance. If I just increase the concurrency number, then I cannot guarantee the order of processing the data of the same client . It is also difficult to create a queue for every client, because the number of client changes frequently.

    Do you have any good idea? Thanks very much!

    opened by cuifan53 0
  • Running on redis cluster

    Running on redis cluster

    I try to a redis cluster with below settings: const JobMain = new Queue('queue', { prefix: 'bq', redis: redis.createCluster({ rootNodes: [ { url: "redis://server1.s1.com:7001", }, { url: "redis://server2.s1.com:7001", }, { url: "redis://server3.s1.com:7001", }, { url: "redis://server4.s1.com:7001", }, ], useReplicas: true, defaults: { password: "123456" }, }), }); How ever it gives me error: ` A queue error happened: Ready check failed: NOAUTH Authentication required. /home/queueJob/node_modules/redis-parser/lib/parser.js:179 return new ReplyError(string) ^

    ReplyError: Ready check failed: NOAUTH Authentication required. at parseError (/home/queueJob/node_modules/redis-parser/lib/parser.js:179:12) at parseType (/home/queueJob/node_modules/redis-parser/lib/parser.js:302:14) { command: 'INFO', code: 'NOAUTH' } ` I try connect directly with redis with blow code and works totally fine:

    ` const redis = require('redis'); const cluster = redis.createCluster({ rootNodes: [ { url: "redis://server1.s1.com:7001", }, { url: "redis://server2.s1.com:7001", }, { url: "redis://server3.s1.com:7001", }, { url: "redis://server4.s1.com:7001", }, ], useReplicas: true, defaults: { password: "123456" }, });

    (async () => {

    cluster.on('error', (err) => console.log('Redis Client Error', err));

    await cluster.connect();

    await cluster.set('hello', 'cluster'); const value = await cluster.get('hello'); console.log(value);

    cluster.disconnect(); })(); `

    Output: cluster

    Any suggestions?

    opened by ltkrbk 0
  • THIS PROJECT IS LOOKING FOR A NEW OWNER

    THIS PROJECT IS LOOKING FOR A NEW OWNER

    Hey everyone!

    The Mixmax Engineering team can no longer maintain this repo. We're moving away from Bee Queue over to SQS.

    We're looking for a new owner of this repo.

    You:

    • are an experienced engineer working on a team that's using Bee Queue at scale.
    • have extra time to be able to review and merge PRs.
    • have an exceptional attention to quality. Many people and organizations rely on this project! Let's keep it stable.

    If you're approved, we will:

    • give you write access to this repo in order to merge PRs (all PRs must be reviewed by other maintainer, per our rules)
    • be available if you have any technical questions
    • after some time and trust is built up, we'll make you a project admin
    help-wanted 
    opened by bradvogel 10
  • removeJob results in TypeError: Cannot read properties of undefined (reading '_evalScript')

    removeJob results in TypeError: Cannot read properties of undefined (reading '_evalScript')

    When trying to remove stalled job that is currently active but isn't processed by errored worker, trying to remove it results in error:

    2|Telegram | /node_modules/bee-queue/lib/queue.js:458
    2|Telegram |     const promise = this._evalScript(
    2|Telegram |                          ^
    2|Telegram |
    2|Telegram | TypeError: Cannot read properties of undefined (reading '_evalScript')
    2|Telegram |     at removeJob (/node_modules/bee-queue/lib/queue.js:458:26)
    
    bug 
    opened by VityaSchel 0
Releases(v1.5.0)
Owner
Bee Queue
A simple, fast, robust job/task queue for Node.js, backed by Redis
Bee Queue
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
Kue is a priority job queue backed by redis, built for node.js.

Kue Kue is no longer maintained Please see e.g. Bull as an alternative. Thank you! Kue is a priority job queue backed by redis, built for node.js. PRO

Automattic 9.4k Dec 20, 2022
A simple high-performance Redis message queue for Node.js.

RedisSMQ - Yet another simple Redis message queue A simple high-performance Redis message queue for Node.js. For more details about RedisSMQ design se

null 501 Dec 30, 2022
🪦 Redis Key Value store backed by IPFS

?? RipDB ?? A snappy, decentralized JSON store perfect for fast moving web3 builders. Redis + IPFS = RIP = ?? Install With a Package Manager (browser

Zac Denham 42 Dec 13, 2022
Redis Simple Message Queue

Redis Simple Message Queue A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server. tl;dr: If you run a R

Patrick Liess 1.6k Dec 27, 2022
BullMQ - Premium Message Queue for NodeJS based on Redis

The fastest, most reliable, Redis-based distributed queue for Node. Carefully written for rock solid stability and atomicity. Read the documentation F

Taskforce.sh Inc. 3.1k Dec 30, 2022
Yet another concurrent priority task queue, yay!

YQueue Yet another concurrent priority task queue, yay! Install npm install yqueue Features Concurrency control Prioritized tasks Error handling for b

null 6 Apr 4, 2022
Job queues and scheduled jobs for Node.js, Beanstalkd and/or Iron.io.

Ironium Job queues and scheduled jobs for Node.js backed by Beanstalk/IronMQ/SQS. The Why You've got a workload that runs outside the Web app's reques

Assaf Arkin 71 Dec 14, 2022
Bree is the best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support.

The best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support. Works in Node v10+ and browsers, uses workers to spawn sandboxed processes, and supports async/await, retries, throttling, concurrency, and graceful shutdown. Simple, fast, and lightweight. Made for @ForwardEmail and @ladjs.

Bree - The Best Node.js and JavaScript Job Scheduler 2.5k Dec 30, 2022
A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set

apiban-redis A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set. Installation This utility can be run as a

jambonz 4 Apr 5, 2022
Cache is easy to use data caching Node.js package. It supports Memcached, Redis, and In-Memory caching engines.

Cache Cache NPM implements wrapper over multiple caching engines - Memcached, Redis and In-memory (use with single threaded process in development mod

PLG Works 49 Oct 24, 2022
Hello Jobs is a one-stop solution for all job seekers. In future, this could also serve as a platform for recruiters to hire potential candidates.

Hello Jobs Hello Jobs is a one-stop solution for all job seekers. In future, this could also serve as a platform for recruiters to hire potential cand

S Harshita 6 Dec 26, 2022
Premium Queue package for handling distributed jobs and messages in NodeJS.

The fastest, most reliable, Redis-based queue for Node. Carefully written for rock solid stability and atomicity. Sponsors · Features · UIs · Install

null 13.5k Dec 31, 2022
Better Queue for NodeJS

Better Queue - Powerful flow control Super simple to use Better Queue is designed to be simple to set up but still let you do complex things. Persiste

Diamond 415 Dec 17, 2022
Opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects.

qewe qewe is an opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects. Installation Add qewe to your p

Jamie McElwain 2 Jan 10, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 6 Nov 22, 2022
A document based messaging queue for Mongo, DocumentDB, and others

DocMQ Messaging Queue for any document-friendly architectures (DocumentDB, Mongo, Postgres + JSONB, etc). Why Choose This DocMQ is a good choice if yo

Jakob Heuser 10 Dec 7, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 4 Jul 5, 2022
Nodejs Background jobs using redis.

node-resque: The best background jobs in node. Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 an

Actionhero 1.2k Jan 3, 2023