Nodejs Background jobs using redis.

Overview

node-resque: The best background jobs in node.

Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 and up required). It includes priority queues, plugins, locking, delayed jobs, and more! This project is a very opinionated but API-compatible with Resque and Sidekiq (caveats). We also implement some of the popular Resque plugins, including resque-scheduler and resque-retry

The full API documentation for this package is automatically generated from the master via typedoc branch and published to https://node-resque.actionherojs.com/

Nodei stats

CircleCI

API Docs

You can read the API docs for Node Resque @ node-resque.actionherojs.com. These are generated automatically from the master branch via TypeDoc

Version Notes

  • The version of redis required is >= 2.6.0 as we use lua scripting to create custom atomic operations
  • ‼️ Version 6+ of Node Resque uses TypeScript. We will still include JavaScript transpiled code in NPM releases, but they will be generated from the TypeScript source. Functionality between node-resque v5 and v6 should be the same.
  • ‼️ Version 5+ of Node Resque uses async/await. There is no upgrade path from previous versions. Node v8.0.0+ is required.

Usage

I learn best by examples:

import { Worker, Plugins, Scheduler, Queue } from "node-resque";

async function boot() {
  // ////////////////////////
  // SET UP THE CONNECTION //
  // ////////////////////////

  const connectionDetails = {
    pkg: "ioredis",
    host: "127.0.0.1",
    password: null,
    port: 6379,
    database: 0,
    // namespace: 'resque',
    // looping: true,
    // options: {password: 'abc'},
  };

  // ///////////////////////////
  // DEFINE YOUR WORKER TASKS //
  // ///////////////////////////

  let jobsToComplete = 0;

  const jobs = {
    add: {
      plugins: [Plugins.JobLock],
      pluginOptions: {
        JobLock: { reEnqueue: true },
      },
      perform: async (a, b) => {
        await new Promise((resolve) => {
          setTimeout(resolve, 1000);
        });
        jobsToComplete--;
        tryShutdown();

        const answer = a + b;
        return answer;
      },
    },
    subtract: {
      perform: (a, b) => {
        jobsToComplete--;
        tryShutdown();

        const answer = a - b;
        return answer;
      },
    },
  };

  // just a helper for this demo
  async function tryShutdown() {
    if (jobsToComplete === 0) {
      await new Promise((resolve) => {
        setTimeout(resolve, 500);
      });
      await scheduler.end();
      await worker.end();
      process.exit();
    }
  }

  // /////////////////
  // START A WORKER //
  // /////////////////

  const worker = new Worker(
    { connection: connectionDetails, queues: ["math", "otherQueue"] },
    jobs
  );
  await worker.connect();
  worker.start();

  // ////////////////////
  // START A SCHEDULER //
  // ////////////////////

  const scheduler = new Scheduler({ connection: connectionDetails });
  await scheduler.connect();
  scheduler.start();

  // //////////////////////
  // REGISTER FOR EVENTS //
  // //////////////////////

  worker.on("start", () => {
    console.log("worker started");
  });
  worker.on("end", () => {
    console.log("worker ended");
  });
  worker.on("cleaning_worker", (worker, pid) => {
    console.log(`cleaning old worker ${worker}`);
  });
  worker.on("poll", (queue) => {
    console.log(`worker polling ${queue}`);
  });
  worker.on("ping", (time) => {
    console.log(`worker check in @ ${time}`);
  });
  worker.on("job", (queue, job) => {
    console.log(`working job ${queue} ${JSON.stringify(job)}`);
  });
  worker.on("reEnqueue", (queue, job, plugin) => {
    console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`);
  });
  worker.on("success", (queue, job, result, duration) => {
    console.log(
      `job success ${queue} ${JSON.stringify(job)} >> ${result} (${duration}ms)`
    );
  });
  worker.on("failure", (queue, job, failure, duration) => {
    console.log(
      `job failure ${queue} ${JSON.stringify(
        job
      )} >> ${failure} (${duration}ms)`
    );
  });
  worker.on("error", (error, queue, job) => {
    console.log(`error ${queue} ${JSON.stringify(job)}  >> ${error}`);
  });
  worker.on("pause", () => {
    console.log("worker paused");
  });

  scheduler.on("start", () => {
    console.log("scheduler started");
  });
  scheduler.on("end", () => {
    console.log("scheduler ended");
  });
  scheduler.on("poll", () => {
    console.log("scheduler polling");
  });
  scheduler.on("leader", () => {
    console.log("scheduler became leader");
  });
  scheduler.on("error", (error) => {
    console.log(`scheduler error >> ${error}`);
  });
  scheduler.on("cleanStuckWorker", (workerName, errorPayload, delta) => {
    console.log(
      `failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`
    );
  });
  scheduler.on("workingTimestamp", (timestamp) => {
    console.log(`scheduler working timestamp ${timestamp}`);
  });
  scheduler.on("transferredJob", (timestamp, job) => {
    console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`);
  });

  // //////////////////////
  // CONNECT TO A QUEUE //
  // //////////////////////

  const queue = new Queue({ connection: connectionDetails }, jobs);
  queue.on("error", function (error) {
    console.log(error);
  });
  await queue.connect();
  await queue.enqueue("math", "add", [1, 2]);
  await queue.enqueue("math", "add", [1, 2]);
  await queue.enqueue("math", "add", [2, 3]);
  await queue.enqueueIn(3000, "math", "subtract", [2, 1]);
  jobsToComplete = 4;
}

boot();

// and when you are done
// await queue.end()
// await scheduler.end()
// await worker.end()

Node Resque Interfaces: Queue, Worker, and Scheduler

There are 3 main classes in node-resque: Queue, Worker, and Scheduler

  • Queue: This is the interface your program uses to interact with resque's queues - to insert jobs, check on the performance of things, and generally administer your background jobs.
  • Worker: This interface is how jobs get processed. Workers are started and then they check for jobs enqueued into various queues and complete them. If there's an error, they write to the error queue.
    • There's a special class called multiWorker in Node Resque which will run many workers at once for you (see below).
  • Scheduler: The scheduler can be thought of as the coordinator for Node Resque. It is primarily in charge of checking when jobs told to run later (with queue.enqueueIn or queue.enqueueAt) should be processed, but it performs some other jobs like checking for 'stuck' workers and general cluster cleanup.
    • You can (and should) run many instances of the scheduler class at once, but only one will be elected to be the 'leader', and actually do work.

Configuration Options:

  • new queue requires only the "queue" variable to be set. You can also pass the jobs hash to it.
  • new worker has some additional options:
options = {
  looping: true,
  timeout: 5000,
  queues: "*",
  name: os.hostname() + ":" + process.pid,
};

Note that when using "*" queue:

  • there's minor performance impact for checking the queues
  • queues are processed in undefined order

The configuration hash passed to new NodeResque.Worker, new NodeResque.Scheduler or new NodeResque.Queue can also take a connection option.

const connectionDetails = {
  pkg: "ioredis",
  host: "127.0.0.1",
  password: "",
  port: 6379,
  database: 0,
  namespace: "resque", // Also allow array of strings
};

const worker = new NodeResque.Worker(
  { connection: connectionDetails, queues: "math" },
  jobs
);

worker.on("error", (error) => {
  // handler errors
});

await worker.connect();
worker.start();

// and when you are done
// await worker.end()

You can also pass redis client directly.

// assume you already initialized redis client before
// the "redis" key can be IORedis.Redis or IORedis.Cluster instance

const redisClient = new Redis();
const connectionDetails = { redis: redisClient };

// or

const redisCluster = new Cluster();
const connectionDetails = { redis: redisCluster };

const worker = new NodeResque.Worker(
  { connection: connectionDetails, queues: "math" },
  jobs
);

worker.on("error", (error) => {
  // handler errors
});

await worker.connect();
worker.start();

// and when you are done
await worker.end();

Notes

  • Be sure to call await worker.end(), await queue.end() and await scheduler.end() before shutting down your application if you want to properly clear your worker status from resque.
  • When ending your application, be sure to allow your workers time to finish what they are working on
  • This project implements the "scheduler" part of rescue-scheduler (the daemon which can promote enqueued delayed jobs into the work queues when it is time), but not the CRON scheduler proxy. To learn more about how to use a CRON-like scheduler, read the Job Schedules section of this document.
  • "Namespace" is a string which is appended to the front of your keys in redis. Normally, it is "resque". This is helpful if you want to store multiple work queues in one redis database. Do not use keyPrefix if you are using the ioredis (default) redis driver in this project (see https://github.com/actionhero/node-resque/issues/245 for more information.)
  • If you are using any plugins which effect beforeEnqueue or afterEnqueue, be sure to pass the jobs argument to the new NodeResque.Queue() constructor
  • If a job fails, it will be added to a special failed queue. You can then inspect these jobs, write a plugin to manage them, move them back to the normal queues, etc. Failure behavior by default is just to enter the failed queue, but there are many options. Check out these examples from the ruby ecosystem for inspiration:
  • If you plan to run more than one worker per nodejs process, be sure to name them something distinct. Names must follow the pattern hostname:pid+unique_id. For example:
  • For the Retry plugin, a success message will be emitted from the worker on each attempt (even if the job fails) except the final retry. The final retry will emit a failure message instead.

If you want to learn more about running Node-Resque with docker, please view the examples here: https://github.com/actionhero/node-resque/tree/master/examples/docker

const name = os.hostname() + ":" + process.pid + "+" + counter;
const worker = new NodeResque.Worker(
  { connection: connectionDetails, queues: "math", name: name },
  jobs
);

Worker#performInline

DO NOT USE THIS IN PRODUCTION. In tests or special cases, you may want to process/work a job in-line. To do so, you can use worker.performInline(jobName, arguments, callback). If you are planning on running a job via #performInline, this worker should also not be started, nor should be using event emitters to monitor this worker. This method will also not write to redis at all, including logging errors, modify resque's stats, etc.

Queue Management

const queue = new NodeResque.Queue({ connection: connectionDetails, jobs });
await queue.connect();

API documentation for the main methods you will be using to enqueue jobs to be worked can be found @ node-resque.actionherojs.com.

Failed Job Management

From time to time, your jobs/workers may fail. Resque workers will move failed jobs to a special failed queue which will store the original arguments of your job, the failing stack trace, and additional metadata.

error example

You can work with these failed jobs with the following methods:

let failedCount = await queue.failedCount()

  • failedCount is the number of jobs in the failed queue

let failedJobs = await queue.failed(start, stop)

  • failedJobs is an array listing the data of the failed jobs. Each element looks like: {"worker": "host:pid", "queue": "test_queue", "payload": {"class":"slowJob", "queue":"test_queue", "args":[null]}, "exception": "TypeError", "error": "MyImport is not a function", "backtrace": [' at Worker.perform (/path/to/worker:111:24)', ' at <anonymous>'], "failed_at": "Fri Dec 12 2014 14:01:16 GMT-0800 (PST)"}
  • To retrieve all failed jobs, use arguments: await queue.failed(0, -1)

Failing a Job

We use a try/catch pattern to catch errors in your jobs. If any job throws an uncaught exception, it will be caught, and the job's payload moved to the error queue for inspection. Do not use domain, process.on("exit"), or any other method of "catching" a process crash.

The error payload looks like:

{ worker: 'busted-worker-3',
  queue: 'busted-queue',
  payload: { class: 'busted_job', queue: 'busted-queue', args: [ 1, 2, 3 ] },
  exception: 'ERROR_NAME',
  error: 'I broke',
  failed_at: 'Sun Apr 26 2015 14:00:44 GMT+0100 (BST)' }

await queue.removeFailed(failedJob)

  • the input failedJob is an expanded node object representing the failed job, retrieved via queue.failed

await queue.retryAndRemoveFailed(failedJob)

  • the input failedJob is an expanded node object representing the failed job, retrieved via queue.failed
  • this method will instantly re-enqueue a failed job back to its original queue, and delete the failed entry for that job

Failed Worker Management

Automatically

By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every timeout (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire.

To modify the 60 minute check, change stuckWorkerTimeout when configuring your scheduler, ie:

const scheduler = new NodeResque.Scheduler({
  stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms
  connection: connectionDetails
})

Set your scheduler's stuckWorkerTimeout = false to disable this behavior.

const scheduler = new NodeResque.Scheduler({
  stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis
  connection: connectionDetails
})

Manually

Sometimes a worker crashes is a severe way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, await queue.cleanOldWorkers(age) is available.

Because there are no 'heartbeats' in resque, it is imposable for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use queue.retryAndRemoveFailed) to re-enqueue the job.

If you know the name of a worker that should be removed, you can also call await queue.forceCleanWorker(workerName) directly, and that will also remove the worker and move any job it was working on into the error queue. This method will still proceed for workers which are only partially in redis, indicting a previous connection failure. In this case, the job which the worker was working on is irrecoverably lost.

Job Schedules

You may want to use node-resque to schedule jobs every minute/hour/day, like a distributed CRON system. There are a number of excellent node packages to help you with this, like node-schedule and node-cron. Node-resque makes it possible for you to use the package of your choice to schedule jobs with.

Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduling the jobs. To help you with this, you can inspect which of the scheduler processes is currently acting as leader, and flag only the master scheduler process to run the schedule. A full example can be found at /examples/scheduledJobs.ts, but the relevant section is:

const NodeResque = require("node-resque");
const schedule = require("node-schedule");
const queue = new NodeResque.Queue({ connection: connectionDetails }, jobs);
const scheduler = new NodeResque.Scheduler({ connection: connectionDetails });
await scheduler.connect();
scheduler.start();

schedule.scheduleJob("10,20,30,40,50 * * * * *", async () => {
  // do this job every 10 seconds, CRON style
  // we want to ensure that only one instance of this job is scheduled in our environment at once,
  // no matter how many schedulers we have running
  if (scheduler.leader) {
    console.log(">>> enqueuing a job");
    await queue.enqueue("time", "ticktock", new Date().toString());
  }
});

Plugins

Just like ruby's resque, you can write worker plugins. They look like this. The 4 hooks you have are beforeEnqueue, afterEnqueue, beforePerform, and afterPerform. Plugins are classes which extend NodeResque.Plugin

const { Plugin } = require("node-resque");

class MyPlugin extends Plugin {
  constructor(...args) {
    // @ts-ignore
    super(...args);
    this.name = "MyPlugin";
  }

  beforeEnqueue() {
    // console.log("** beforeEnqueue")
    return true; // should the job be enqueued?
  }

  afterEnqueue() {
    // console.log("** afterEnqueue")
  }

  beforePerform() {
    // console.log("** beforePerform")
    return true; // should the job be run?
  }

  afterPerform() {
    // console.log("** afterPerform")
  }
}

And then your plugin can be invoked within a job like this:

const jobs = {
  add: {
    plugins: [MyPlugin],
    pluginOptions: {
      MyPlugin: { thing: "stuff" },
    },
    perform: (a, b) => {
      let answer = a + b;
      return answer;
    },
  },
};

notes

  • You need to return true or false on the before hooks. true indicates that the action should continue, and false prevents it. This is called toRun.
  • If you are writing a plugin to deal with errors which may occur during your resque job, you can inspect and modify this.worker.error in your plugin. If this.worker.error is null, no error will be logged in the resque error queue.
  • There are a few included plugins, all in the src/plugins/* directory. You can write your own and include it like this:
const jobs = {
  add: {
    plugins: [require("Myplugin").Myplugin],
    pluginOptions: {
      MyPlugin: { thing: "stuff" },
    },
    perform: (a, b) => {
      let answer = a + b;
      return answer;
    },
  },
};

The plugins which are included with this package are:

  • DelayQueueLock
    • If a job with the same name, queue, and args is already in the delayed queue(s), do not enqueue it again
  • JobLock
    • If a job with the same name, queue, and args is already running, put this job back in the queue and try later
  • QueueLock
    • If a job with the same name, queue, and args is already in the queue, do not enqueue it again
  • Retry
    • If a job fails, retry it N times before finally placing it into the failed queue

Multi Worker

node-resque provides a wrapper around the Worker class which will auto-scale the number of resque workers. This will process more than one job at a time as long as there is idle CPU within the event loop. For example, if you have a slow job that sends email via SMTP (with low overhead), we can process many jobs at a time, but if you have a math-heavy operation, we'll stick to 1. The MultiWorker handles this by spawning more and more node-resque workers and managing the pool.

const NodeResque = require("node-resque");

const connectionDetails = {
  pkg: "ioredis",
  host: "127.0.0.1",
  password: "",
};

const multiWorker = new NodeResque.MultiWorker(
  {
    connection: connectionDetails,
    queues: ["slowQueue"],
    minTaskProcessors: 1,
    maxTaskProcessors: 100,
    checkTimeout: 1000,
    maxEventLoopDelay: 10,
  },
  jobs
);

// normal worker emitters
multiWorker.on("start", (workerId) => {
  console.log("worker[" + workerId + "] started");
});
multiWorker.on("end", (workerId) => {
  console.log("worker[" + workerId + "] ended");
});
multiWorker.on("cleaning_worker", (workerId, worker, pid) => {
  console.log("cleaning old worker " + worker);
});
multiWorker.on("poll", (workerId, queue) => {
  console.log("worker[" + workerId + "] polling " + queue);
});
multiWorker.on("ping", (workerId, time) => {
  console.log("worker[" + workerId + "] check in @ " + time);
});
multiWorker.on("job", (workerId, queue, job) => {
  console.log(
    "worker[" + workerId + "] working job " + queue + " " + JSON.stringify(job)
  );
});
multiWorker.on("reEnqueue", (workerId, queue, job, plugin) => {
  console.log(
    "worker[" +
      workerId +
      "] reEnqueue job (" +
      plugin +
      ") " +
      queue +
      " " +
      JSON.stringify(job)
  );
});
multiWorker.on("success", (workerId, queue, job, result) => {
  console.log(
    "worker[" +
      workerId +
      "] job success " +
      queue +
      " " +
      JSON.stringify(job) +
      " >> " +
      result
  );
});
multiWorker.on("failure", (workerId, queue, job, failure) => {
  console.log(
    "worker[" +
      workerId +
      "] job failure " +
      queue +
      " " +
      JSON.stringify(job) +
      " >> " +
      failure
  );
});
multiWorker.on("error", (workerId, queue, job, error) => {
  console.log(
    "worker[" +
      workerId +
      "] error " +
      queue +
      " " +
      JSON.stringify(job) +
      " >> " +
      error
  );
});
multiWorker.on("pause", (workerId) => {
  console.log("worker[" + workerId + "] paused");
});

// multiWorker emitters
multiWorker.on("internalError", (error) => {
  console.log(error);
});
multiWorker.on("multiWorkerAction", (verb, delay) => {
  console.log(
    "*** checked for worker status: " +
      verb +
      " (event loop delay: " +
      delay +
      "ms)"
  );
});

multiWorker.start();

MultiWorker Options

The Options available for the multiWorker are:

  • connection: The redis configuration options (same as worker)
  • queues: Array of ordered queue names (or *) (same as worker)
  • minTaskProcessors: The minimum number of workers to spawn under this multiWorker, even if there is no work to do. You need at least one, or no work will ever be processed or checked
  • maxTaskProcessors: The maximum number of workers to spawn under this multiWorker, even if the queues are long and there is available CPU (the event loop isn't entirely blocked) to this node process.
  • checkTimeout: How often to check if the event loop is blocked (in ms) (for adding or removing multiWorker children),
  • maxEventLoopDelay: How long the event loop has to be delayed before considering it blocked (in ms),

Presentation

This package was featured heavily in this presentation I gave about background jobs + node.js. It contains more examples!

Acknowledgments

Comments
  • Worker processes not being cleanup properly on heroku

    Worker processes not being cleanup properly on heroku

    I'm having problems with workers not being cleaned up properly on heroku. Since heroku uses different dynos per worker, its not possible to use the built in worker cleanup inside node-resque (workers will be 1 PID per dyno).

    Here is the worker code I am using:

    NR = require("node-resque")
    
    worker = new NR.worker {connection: config.resque, queues: queues, timeout: 1000}, jobs, ->
      debug "Worker running on DYNO: #{process.env.DYNO}"
      worker.start()
    
    worker.on 'error', (queue, job, error) ->
      debug("Worker error: " + queue + " " + JSON.stringify(job) + " >> " + error)
    
    worker.on "end", ->
      debug "Worker has ended"
    
    process.once "SIGTERM", ->
      debug "Worker issued SIGTERM, shutting down"
      debug "Worker is currently working" if worker.working is true
      worker.end ->
        debug "Worker has been untracked, quitting process"
        process.exit()
    
    debug "Instance of worker loaded."
    

    With heroku, they issue a SIGTERM to all processes when a dyno shuts down. This in our case will call worker.end and then process.exit(). If the process doesn't end within 10 seconds, heroku will issue a SIGKILL - we are not seeing it get this far in our logs.

    Sometimes when a dyno is terminated, we see this in the logs (a healthy shutdown): Note also that heroku logs are not deterministic in their ordering.

    Aug 13 14:21:44 app/worker.2:  Wed, 13 Aug 2014 21:21:44 GMT Worker has ended 
    Aug 13 14:21:44 app/worker.2:  Wed, 13 Aug 2014 21:21:44 GMT Worker issued SIGTERM, shutting down 
    Aug 13 14:21:45 app/worker.2:  Wed, 13 Aug 2014 21:21:44 GMT Worker has been untracked, quitting process
    Aug 13 14:21:45 heroku/worker.2:  Stopping all processes with SIGTERM 
    

    Note the untracked log in the snip above, this in my mind is a healthy shutdown.

    However, sometimes it is terminated and we see this (resque thinks this worker is still working):

    Aug 14 09:03:21 heroku/worker.4:  State changed from up to down 
    Aug 14 09:03:25 app/worker.4:  Thu, 14 Aug 2014 16:03:25 GMT Worker issued SIGTERM, shutting down 
    Aug 14 09:03:25 app/worker.4:  Thu, 14 Aug 2014 16:03:25 GMT Worker has ended 
    Aug 14 09:03:25 app/worker.4:  Error waiting for process to terminate: No child processes 
    Aug 14 09:03:26 heroku/worker.4:  Stopping all processes with SIGTERM 
    Aug 14 09:03:26 heroku/worker.4:  Process exited with status 22 
    

    I have also been following up with heroku support on this issue in case it's a problem with how they shut down instances, but it appears as though the problem is from within node-resque. They mentioned that sometimes node doesn't completely flush its logs before the process exits, this might be the case for why the logs are different above, but it should still untrack the worker in redis.

    Happy to provide any more information and tests to help track this down, appreciate anyone who can help look into it.

    bug waiting-on-poster 
    opened by sudothinker 31
  • Worker not registered if Redis down on startup

    Worker not registered if Redis down on startup

    Hey,

    The Redis library happily reconnects if the server is down for a second when createClient is called. NodeResque, however, doesn't seem to be too aware of that as the worker isn't registered to Redis in that case, even though jobs seem to run fine.

    opened by moll 17
  • Add Job status update plugin (php-resque way)

    Add Job status update plugin (php-resque way)

    This adds :status key updating logic in redis, so that status monitoring can be used with php-resque.

    // Create worker object
    const worker = new Worker(...);
    
    // Attach WorkerJobStatusPlugin events to worker
    new WorkerJobStatusPlugin(worker);
    

    Refs:

    • https://github.com/actionhero/node-resque/issues/334
    opened by glensc 12
  • Worker job timeout

    Worker job timeout

    It looks like we're lacking the ability to time out a job if it hasn't completed for some specified period, or perhaps I'm overlooking that functionality.

    I just noticed I had two workers that were stuck for 15+ minutes on something that should take < 1 second, which means something funky happened. I want to make sure that worker knows it can abort whatever is hung, and start polling again.

    Thoughts?

    wishlist waiting-on-poster 
    opened by nathanbowser 12
  • Updating to v5.5 and node v10.15 creates schedule lag.

    Updating to v5.5 and node v10.15 creates schedule lag.

    I recently updated our node-resque implementation to v5.5 and updated node to v10.15 at the same time. We have the node scheduler serving jobs for both node and php workers. The throughput of jobs are not as high and see an increase in the resque queue. CPU usage on the instance is way below where it used to be before the update. Any insight would be helpful. I can provide code samples but the initialization is nothing out of the ordinary.

    opened by jchoi926 11
  • Updated queue, scheduler and worker to pass back error on connection fai...

    Updated queue, scheduler and worker to pass back error on connection fai...

    At the moment, if you do:

    var connectionDetails = {
      host:      "127.0.0.1",
      password:  "",
      port:      "wrongport",
      database:  0,
    }
    var queue = new NR.queue(
      { connection: connectionDetails },
      jobs,
      function(err) {
        if(err) { throw err; }  // Never thrown because err is always undefined
        console.log('Task Queue started OK');
      }
    );
    

    The error handling won't kick in because the connection error is never passed through.

    This PR fixes this, and hopefully also for the worker and scheduler.

    Note: I couldn't get the scheduler tests to work correctly with the extra test I added, so I had to xit it out. Please could you take a look and see what is going wrong (try un-xitting and see what I mean).

    Oh, and I added a few missing semicolons just for good measure :-)

    opened by fiznool 11
  • Don't work methods that use KEYS command(ioredis)

    Don't work methods that use KEYS command(ioredis)

    Hi, If I set in config keyPrefix. Some commands stop work(it described in ioredis docs: https://github.com/luin/ioredis/blob/master/README.md#transparent-key-prefixing)

    Step to reproduce:

    • create NodeResque.Queue with not null options keyPrefix
    • create scheduled job
    • call await queue.timestamps()

    Methods that stop working(maybe not all):

    • queue.timestamps()
    • queue.locks()
    • queue.stats()
    • scheduler.checkStuckWorkers()
    bug waiting-on-poster 
    opened by witem 10
  • deleting lock for master?

    deleting lock for master?

    I just noticed that delayed jobs are no longer getting scheduled. Is there a way to check and see if a scheduler lock is 'stuck'? I suspect that clearing the redis db would restart the scheduler polling, but I would like to avoid that, if possible.

    waiting-on-poster question 
    opened by afrozl 10
  • Problems if worker doesn't have an error listener

    Problems if worker doesn't have an error listener

    If a worker doesn't have an error event listener attached and a worker has an error, things get into a weird state because doneWorking is never called. You can see why by looking into node's eventemitter: https://github.com/joyent/node/blob/master/lib/events.js#L74-89

    bug will not fix 
    opened by nathanbowser 10
  • Workers cleanup

    Workers cleanup

    Shouldn't the multiWorker cleanup dead workers automatically? Or at least on startup? I see over 100 workers in resque-web interface, but really there should be like 3-10 of them.

    I do notice that the dead ones have NaN as status, perhaps that's the reason they are not cleaned up?

    image

    question documentation 
    opened by glensc 9
  • (worker.js) TypeError: callback is not a function

    (worker.js) TypeError: callback is not a function

    If I set worker option looping: false then I get a crash:

    /home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/async.js:61
            fn = function () { throw arg; };
                               ^
    
    TypeError: callback is not a function
        at /home/borisov/test/redis/node_modules/node-resque/lib/worker.js:115:13
        at tryCatcher (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/util.js:16:23)
        at Promise.successAdapter [as _fulfillmentHandler0] (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/nodeify.js:23:30)
        at Promise._settlePromise (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/promise.js:564:21)
        at Promise._settlePromise0 (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/promise.js:612:10)
        at Promise._settlePromises (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/promise.js:691:18)
        at Async._drainQueue (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/async.js:138:16)
        at Async._drainQueues (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/async.js:148:10)
        at Immediate.Async.drainQueues [as _onImmediate] (/home/borisov/test/redis/node_modules/node-resque/node_modules/bluebird/js/release/async.js:17:14)
        at processImmediate [as _immediateCallback] (timers.js:383:17)
    

    Code is pretty much copy pasted example but let me know if you need it.

    node-resque version 3.0.4

    low-priority bug 
    opened by borisovg 9
  • Resque UI Erroring

    Resque UI Erroring

    Hi, when loading up the resque-web interface, I get the same error from numerous pages (such as the /overview page as seen below). It looks like it's trying to split the worker_id by : but it seems to only contain a single :. An example that I managed to pull out was:

    c976a8a2-903b-47cb-be6f-799ab03a32a5:main
    

    Any advice on how to get this working would be great!

    image

    bug 
    opened by Gilbert09 0
  • Need help with resque and redis memory

    Need help with resque and redis memory

    Hi everybody!

    I have some trouble with my configuration

    I have explore my redis database, and the folder resque if growing fast.

    I'm putting a new app in production soon, and it's a problem :/

    What the best way to flush

    • old worker faster
    • ping list
    • failure
    • stat

    What do you recommend? Thank

    Thank you Vincent

    enhancement 
    opened by dmpvost 4
  • JobLock Plugin - beforePerform behaviour

    JobLock Plugin - beforePerform behaviour

    Hey,

    I found an issue related with JobLock Plugin. By default, JobLock re-enqueues a job if its key was already set. If, for some reason, renqueue fails the error will be caught in here:

    https://github.com/actionhero/node-resque/blob/708ea4a5f09f3068789d17ad4e9ae77bdd3b9e85/src/core/worker.ts#L295-L314

    and afterPerform stage will be executed anyway. This will cause JobLock key to be deleted and it will completely break the plugin purpose because it immediately allows other jobs(that share the same key) to be executed concurrently.

    Possible solutions:

    1. refactor Joblock beforePerform() stage to handle any possible errors when re-enqueing
    2. don't call RunPlugins('afterPerform') if job perform wasn't effectively called.

    Cheers

    bug 
    opened by jamagalhaes 3
  • Support for Redis Cluster by providing an alternative to pipelining

    Support for Redis Cluster by providing an alternative to pipelining

    After I read the Redis cluster connection in the readme document, I've tested the sample source connecting the Redis Cluster with 3 nodes. Then I faced the following error message.

    ReplyError: CROSSSLOT Keys in request don't hash to the same slot
    return new ReplyError(string)
               ^
    ReplyError: CROSSSLOT Keys in request don't hash to the same slot
        at parseError (/home/cr-api-server/node_modules/redis-parser/lib/parser.js:179:12)
        at parseType (/home/cr-api-server/node_modules/redis-parser/lib/parser.js:302:14) {
      command: { name: 'exec', args: [] },
      previousErrors: []
    }
    

    I found out the following issues.

    Please let me know the node-resque support a Redis Cluster or not. Thanks.

    bug 
    opened by peteAhn 2
  • beforeEnqueue/afterEnqueue not called upon Retry

    beforeEnqueue/afterEnqueue not called upon Retry

    Pretty obvious why neither of these events are emitted when the Retry plugin re-enqueues a job:

    The Queue#enqueue method emits both, but the logic for the retry plugin includes a call to Queue#enqueueIn which in turn calls Queue#enqueueAt, which does not emit either event.

    The main issue I'm trying to overcome is to manipulate the args passed to perform before/after a job runs. Specifically, I'd like to pass a Sequelize model instance as an arg, but normally when it's serialized you obviously lose the instance and are left with just a plain object. To that end, I wanted to implement an interceptor like so:

    beforeEnqueue() {
      // iterate through args, convert anything that is instanceof Model to 'gid://<Model Name>/<ID>'
    }
    
    beforePerform() {
      // iterate through args, anything that starts with 'gid://' is converted back to the instance
    }
    

    The above works fine on the first attempt at the job, but if the job fails and is retried, beforeEnqueue is not called and the resulting arg is serialized normally, and I have just a sad plain object 😦

    Unless there is an alternative, glaringly obvious way to achieve my goal that I missing... Open to ideas.

    Thanks!

    enhancement 
    opened by ehainer 6
  • Why does jobs need to be passed to queue?

    Why does jobs need to be passed to queue?

    The documentation has the following blurb:

    new queue requires only the "queue" variable to be set. You can also pass the jobs hash to it.

    What happens if I don't pass jobs? Why would I pass it if it's optional?

    I was looking at the code (queue.ts), and it looked like the job's plugins would not run if I don't pass jobs to the queue (as const job = this.jobs[func]; would be undefined), is this intentional?

    low-priority enhancement 
    opened by fo-fo 6
Releases(v9.2.2)
  • v9.2.2(Jan 2, 2023)

    What's Changed

    • Bump typescript from 4.8.4 to 4.9.3 by @dependabot in https://github.com/actionhero/node-resque/pull/882
    • Bump typedoc from 0.23.20 to 0.23.21 by @dependabot in https://github.com/actionhero/node-resque/pull/881
    • Bump sinatra from 2.2.0 to 2.2.3 in /resque-web by @dependabot in https://github.com/actionhero/node-resque/pull/885
    • Bump @types/node from 18.11.9 to 18.11.13 by @dependabot in https://github.com/actionhero/node-resque/pull/887
    • Bump typescript from 4.9.3 to 4.9.4 by @dependabot in https://github.com/actionhero/node-resque/pull/890
    • Bump typedoc from 0.23.21 to 0.23.22 by @dependabot in https://github.com/actionhero/node-resque/pull/889
    • Bump prettier from 2.7.1 to 2.8.1 by @dependabot in https://github.com/actionhero/node-resque/pull/888
    • Bump @types/node from 18.11.13 to 18.11.17 by @dependabot in https://github.com/actionhero/node-resque/pull/892
    • Bump typedoc from 0.23.22 to 0.23.23 by @dependabot in https://github.com/actionhero/node-resque/pull/891
    • Bump @types/node from 18.11.17 to 18.11.18 by @dependabot in https://github.com/actionhero/node-resque/pull/894
    • Bump actions/cache from 3.0.11 to 3.2.2 by @dependabot in https://github.com/actionhero/node-resque/pull/895
    • Bump json5 from 2.2.1 to 2.2.3 by @dependabot in https://github.com/actionhero/node-resque/pull/896
    • Use node v18 locally by @evantahler in https://github.com/actionhero/node-resque/pull/897
    • test node v18 by @evantahler in https://github.com/actionhero/node-resque/pull/898

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.2.1...v9.2.2

    Source code(tar.gz)
    Source code(zip)
  • v9.2.1(Nov 9, 2022)

    What's Changed

    • Remove redis events on connection end by @joksnet in https://github.com/actionhero/node-resque/pull/880
    • Update Dependencies

    New Contributors

    • @joksnet made their first contribution in https://github.com/actionhero/node-resque/pull/880

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.2.0...v9.2.1

    Source code(tar.gz)
    Source code(zip)
  • v9.2.0(Jul 18, 2022)

    What's Changed

    • Typo s/imposible/impossible by @bcomnes in https://github.com/actionhero/node-resque/pull/802
    • Upgrade packages, including ioredis by @evantahler in https://github.com/actionhero/node-resque/pull/836

    New Contributors

    • @bcomnes made their first contribution in https://github.com/actionhero/node-resque/pull/802

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.1.7...v9.2.0

    Source code(tar.gz)
    Source code(zip)
  • v9.1.7(Apr 4, 2022)

    What's Changed

    • Bump prettier from 2.6.0 to 2.6.1 by @dependabot in https://github.com/actionhero/node-resque/pull/776
    • Bump typescript from 4.6.2 to 4.6.3 by @dependabot in https://github.com/actionhero/node-resque/pull/777
    • Bump @types/node from 17.0.21 to 17.0.23 by @dependabot in https://github.com/actionhero/node-resque/pull/778
    • Bump ioredis-mock from 7.1.0 to 7.2.0 by @dependabot in https://github.com/actionhero/node-resque/pull/779
    • Bump ts-jest from 27.1.3 to 27.1.4 by @dependabot in https://github.com/actionhero/node-resque/pull/780
    • Bump minimist from 1.2.5 to 1.2.6 by @dependabot in https://github.com/actionhero/node-resque/pull/781
    • Bump prettier from 2.6.1 to 2.6.2 by @dependabot in https://github.com/actionhero/node-resque/pull/782

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.1.6...v9.1.7

    Source code(tar.gz)
    Source code(zip)
  • v9.1.6(Mar 21, 2022)

    What's Changed

    • Bump Dependencies
    • Remove multiWorker internalError listener example by @MasterOdin in https://github.com/actionhero/node-resque/pull/768
    • Call afterPerform in performInline on error by @stellarhoof in https://github.com/actionhero/node-resque/pull/773

    New Contributors

    • @MasterOdin made their first contribution in https://github.com/actionhero/node-resque/pull/768
    • @stellarhoof made their first contribution in https://github.com/actionhero/node-resque/pull/773

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.1.5...v9.1.6

    Source code(tar.gz)
    Source code(zip)
  • v9.1.5(Jan 28, 2022)

    What's Changed

    • Bump ioredis-mock from 5.9.0 to 6.0.0 by @dependabot in https://github.com/actionhero/node-resque/pull/751
    • Bump @types/node from 17.0.10 to 17.0.13 by @dependabot in https://github.com/actionhero/node-resque/pull/752

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.1.4...v9.1.5

    Source code(tar.gz)
    Source code(zip)
  • v9.1.4(Jan 24, 2022)

    What's Changed

    • Use pipelines to to make more transactional actions by @evantahler in https://github.com/actionhero/node-resque/pull/750
    • Update Dependencies

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.1.3...v9.1.4

    Source code(tar.gz)
    Source code(zip)
  • v9.1.3(Dec 18, 2021)

    What's Changed

    • Do not use Atomic Lua script to prepare worker jobs when using Redis Cluster by @evantahler in https://github.com/actionhero/node-resque/pull/730

    Full Changelog: https://github.com/actionhero/node-resque/compare/v9.1.2...v9.1.3

    Source code(tar.gz)
    Source code(zip)
  • v9.1.2(Nov 21, 2021)

  • v9.1.1(Aug 13, 2021)

  • v9.1.0-alpha.1(Aug 5, 2021)

  • v9.1.0-alpha.0(Aug 5, 2021)

  • v9.0.4(Jul 13, 2021)

  • v9.0.3(Jul 13, 2021)

    • Ensure all timers are stopped when worker stops (#630)
    • Upgrade to Jest v27 (#616)
    • Test & deploy with Github Actions (#379)
    • Update Dependencies
    Source code(tar.gz)
    Source code(zip)
  • v9.0.2(Jun 9, 2021)

  • v9.0.1(Jun 1, 2021)

  • v9.0.0(May 19, 2021)

  • v8.2.9(May 10, 2021)

  • v8.2.8(Apr 29, 2021)

  • v8.2.7(Apr 26, 2021)

  • v8.2.6(Apr 5, 2021)

    • Add ability to delete jobs in a queue by function alone without args delByFunction (https://github.com/actionhero/node-resque/pull/561)
    • Update dependencies
    Source code(tar.gz)
    Source code(zip)
  • v8.2.5(Apr 1, 2021)

  • v8.2.4(Mar 3, 2021)

  • v8.2.3(Feb 11, 2021)

  • v8.2.2(Feb 1, 2021)

  • v8.2.1(Jan 25, 2021)

  • v8.2.0(Dec 5, 2020)

    • Use relative imports (#476)
    • Drop duplicate connectionTestAndLoadLua from Connection class (#478)
    • Plugin Classes gain name of constructor (#485)
    • Update documentation to use plugin classes, add Plugins export (#479)
    • Update dependencies (#487)

    A big thank you to @glensc for all the help!

    Source code(tar.gz)
    Source code(zip)
  • v8.1.0(Nov 16, 2020)

  • v8.0.4(Nov 9, 2020)

Owner
Actionhero
The Reusable, Scalable, and Quick node.js API Server
Actionhero
BullMQ - Premium Message Queue for NodeJS based on Redis

The fastest, most reliable, Redis-based distributed queue for Node. Carefully written for rock solid stability and atomicity. Read the documentation F

Taskforce.sh Inc. 3.1k Dec 30, 2022
Job queues and scheduled jobs for Node.js, Beanstalkd and/or Iron.io.

Ironium Job queues and scheduled jobs for Node.js backed by Beanstalk/IronMQ/SQS. The Why You've got a workload that runs outside the Web app's reques

Assaf Arkin 71 Dec 14, 2022
Challenge [Frontend Mentor] - In this challenge, JavaScript was used to filter jobs based on the selected categories. Technologies used: HTML5, CSS3 and React.

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

Rui Neto 11 Apr 13, 2022
Hello Jobs is a one-stop solution for all job seekers. In future, this could also serve as a platform for recruiters to hire potential candidates.

Hello Jobs Hello Jobs is a one-stop solution for all job seekers. In future, this could also serve as a platform for recruiters to hire potential cand

S Harshita 6 Dec 26, 2022
Cloud Run Jobs Demos - A collection of samples to show you how and when to run a container to completion without a server

Cloud Run Jobs Demo Applications Cloud Run Jobs allows you to run a container to completion without a server. This repository contains a collection of

Google Cloud Platform 34 Dec 23, 2022
Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime

WORKHORSE UI Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime. Start local Run npm i Copy and name prox

Workhorse 2 Apr 15, 2022
Kue is a priority job queue backed by redis, built for node.js.

Kue Kue is no longer maintained Please see e.g. Bull as an alternative. Thank you! Kue is a priority job queue backed by redis, built for node.js. PRO

Automattic 9.4k Dec 20, 2022
A simple, fast, robust job/task queue for Node.js, backed by Redis.

A simple, fast, robust job/task queue for Node.js, backed by Redis. Simple: ~1000 LOC, and minimal dependencies. Fast: maximizes throughput by minimiz

Bee Queue 3.1k Jan 5, 2023
Redis Simple Message Queue

Redis Simple Message Queue A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server. tl;dr: If you run a R

Patrick Liess 1.6k Dec 27, 2022
A simple high-performance Redis message queue for Node.js.

RedisSMQ - Yet another simple Redis message queue A simple high-performance Redis message queue for Node.js. For more details about RedisSMQ design se

null 501 Dec 30, 2022
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.

Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, dis

Conveyor MQ 45 Dec 15, 2022
egg.js(jwt) + mysql(sequelize) + redis + docker + docker-compose + nginx + vue + element-ui 全栈获取省市区数据(统计局数据)【工具】项目,实现在docker环境中一键部署

Egg-spider Preview 线上预览地址 (https://ronaldoxzb.com/) admin admin Project description [后端]egg.js(jwt) + mysql(sequelize) + redis + docker + docker-compo

null 11 Sep 29, 2022
Serverless URL Shortener made with Next.js + Redis.

linki: a place for your links linki is a url shortener made with next.js and redis! built with simplicity in mind, it's all in one page. deploy your o

Jack Reiker 12 Sep 15, 2022
A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set

apiban-redis A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set. Installation This utility can be run as a

jambonz 4 Apr 5, 2022
Serve read-only Redis data over a HTTP API with auth

Redis data exposer This was created for Cliptok and not intended for use outside of it. Use at your own peril. This application will serve an API that

Erisa A 10 May 28, 2022
This project demonstrates how you can use the multi-model capabilities of Redis to create a real-time stock watchlist application.

Introduction This project demonstrates how you can use Redis Stack to create a real-time stock watchlist application. It uses several different featur

Redis Developer 43 Jan 2, 2023
Full type-safe Redis PubSub with Zod

redis-pubsub Full type-safe Redis PubSub system with async iterators Features Type-safety with Zod Out-of-the-box support for Date/Map/Set/BigInt seri

null 12 Dec 21, 2022
Cache is easy to use data caching Node.js package. It supports Memcached, Redis, and In-Memory caching engines.

Cache Cache NPM implements wrapper over multiple caching engines - Memcached, Redis and In-memory (use with single threaded process in development mod

PLG Works 49 Oct 24, 2022