Better Queue for NodeJS

Overview

Better Queue - Powerful flow control

npm package

Build status Dependency Status Known Vulnerabilities Gitter

Super simple to use

Better Queue is designed to be simple to set up but still let you do complex things.

  • Persistent (and extendable) storage
  • Batched processing
  • Prioritize tasks
  • Merge/filter tasks
  • Progress events (with ETA!)
  • Fine-tuned timing controls
  • Retry on fail
  • Concurrent batch processing
  • Task statistics (average completion time, failure rate and peak queue size)
  • ... and more!

Install (via npm)

npm install --save better-queue

Quick Example

var Queue = require('better-queue');

var q = new Queue(function (input, cb) {
  
  // Some processing here ...

  cb(null, result);
})

q.push(1)
q.push({ x: 1 })

Table of contents


You will be able to combine any (and all) of these options for your queue!

Queuing

It's very easy to push tasks into the queue.

var q = new Queue(fn);
q.push(1);
q.push({ x: 1, y: 2 });
q.push("hello");

You can also include a callback as a second parameter to the push function, which would be called when that task is done. For example:

var q = new Queue(fn);
q.push(1, function (err, result) {
  // Results from the task!
});

You can also listen to events on the results of the push call.

var q = new Queue(fn);
q.push(1)
  .on('finish', function (result) {
    // Task succeeded with {result}!
  })
  .on('failed', function (err) {
    // Task failed!
  })

Alternatively, you can subscribe to the queue's events.

var q = new Queue(fn);
q.on('task_finish', function (taskId, result, stats) {
  // taskId = 1, result: 3, stats = { elapsed: <time taken> }
  // taskId = 2, result: 5, stats = { elapsed: <time taken> }
})
q.on('task_failed', function (taskId, err, stats) {
  // Handle error, stats = { elapsed: <time taken> }
})
q.on('empty', function (){})
q.on('drain', function (){})
q.push({ id: 1, a: 1, b: 2 });
q.push({ id: 2, a: 2, b: 3 });

empty event fires when all of the tasks have been pulled off of the queue (there may still be tasks running!)

drain event fires when there are no more tasks on the queue and when no more tasks are running.

You can control how many tasks process at the same time.

var q = new Queue(fn, { concurrent: 3 })

Now the queue will allow 3 tasks running at the same time. (By default, we handle tasks one at a time.)

You can also turn the queue into a stack by turning on filo.

var q = new Queue(fn, { filo: true })

Now items you push on will be handled first.

back to top


Task Management

Task ID

Tasks can be given an ID to help identify and track it as it goes through the queue.

By default, we look for task.id to see if it's a string property, otherwise we generate a random ID for the task.

You can pass in an id property to options to change this behaviour. Here are some examples of how:

var q = new Queue(fn, {
  id: 'id',   // Default: task's `id` property
  id: 'name', // task's `name` property
  id: function (task, cb) {
    // Compute the ID
    cb(null, 'computed_id');
  }
})

One thing you can do with Task ID is merge tasks:

var counter = new Queue(function (task, cb) {
  console.log("I have %d %ss.", task.count, task.id);
  cb();
}, {
  merge: function (oldTask, newTask, cb) {
    oldTask.count += newTask.count;
    cb(null, oldTask);
  }
})
counter.push({ id: 'apple', count: 2 });
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 1 });
// Prints out:
//   I have 3 apples.
//   I have 2 oranges.

By default, if tasks have the same ID they replace the previous task.

var counter = new Queue(function (task, cb) {
  console.log("I have %d %ss.", task.count, task.id);
  cb();
})
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'apple', count: 3 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 2 });
// Prints out:
//   I have 3 apples.
//   I have 2 oranges.

You can also use the task ID when subscribing to events from Queue.

var counter = new Queue(fn)
counter.on('task_finish', function (taskId, result) {
  // taskId will be 'jim' or 'bob'
})
counter.push({ id: 'jim', count: 2 });
counter.push({ id: 'bob', count: 1 });

Batch Processing

Your processing function can also be modified to handle multiple tasks at the same time. For example:

var ages = new Queue(function (batch, cb) {
  // Batch 1:
  //   [ { id: 'steve', age: 21 },
  //     { id: 'john', age: 34 },
  //     { id: 'joe', age: 18 } ]
  // Batch 2:
  //   [ { id: 'mary', age: 23 } ]
  cb();
}, { batchSize: 3 })
ages.push({ id: 'steve', age: 21 });
ages.push({ id: 'john', age: 34 });
ages.push({ id: 'joe', age: 18 });
ages.push({ id: 'mary', age: 23 });

Note how the queue will only handle at most 3 items at a time.

Below is another example of a batched call with numbers.

var ages = new Queue(function (batch, cb) {
  // batch = [1,2,3]
  cb();
}, { batchSize: 3 })
ages.push(1);
ages.push(2);
ages.push(3);

Filtering, Validation and Priority

You can also format (and filter) the input that arrives from a push before it gets processed by the queue by passing in a filter function.

var greeter = new Queue(function (name, cb) {
  console.log("Hello, %s!", name)
  cb();
}, {
  filter: function (input, cb) {
    if (input === 'Bob') {
      return cb('not_allowed');
    }
    return cb(null, input.toUpperCase())
  }
});
greeter.push('anna'); // Prints 'Hello, ANNA!'

This can be particularly useful if your queue needs to do some pre-processing, input validation, database lookup, etc. before you load it onto the queue.

You can also define a priority function to control which tasks get processed first.

var greeter = new Queue(function (name, cb) {
  console.log("Greetings, %s.", name);
  cb();
}, {
  priority: function (name, cb) {
    if (name === "Steve") return cb(null, 10);
    if (name === "Mary") return cb(null, 5);
    if (name === "Joe") return cb(null, 5);
    cb(null, 1);
  }
})
greeter.push("Steve");
greeter.push("John");
greeter.push("Joe");
greeter.push("Mary");

// Prints out:
//   Greetings, Steve.
//   Greetings, Joe.
//   Greetings, Mary.
//   Greetings, John.

If filo is set to true in the example above, then Joe and Mary would swap order.

back to top


Queue Management

Retry

You can set tasks to retry maxRetries times if they fail. By default, tasks will fail (and will not retry.) Optionally, you can set a retryDelay to wait a little while before retrying.

var q = new Queue(fn, { maxRetries: 10, retryDelay: 1000 })

Timing

You can configure the queue to have a maxTimeout.

var q = new Queue(function (name, cb) {
  someLongTask(function () {
    cb();
  })
}, { maxTimeout: 2000 })

After 2 seconds, the process will throw an error instead of waiting for the callback to finish.

You can also delay the queue before it starts its processing. This is the behaviour of a timed cargo.

var q = new Queue(function (batch, cb) {
  // Batch [1,2] will process after 2s.
  cb();
}, { batchSize: 5, batchDelay: 2000 })
q.push(1);
setTimeout(function () {
  q.push(2);
}, 1000)

You can also set afterProcessDelay, which will delay processing between tasks.

var q = new Queue(function (task, cb) {
  cb(); // Will wait 1 second before taking the next task
}, { afterProcessDelay: 1000 })
q.push(1);
q.push(2);

Instead of just the batchDelay, you can add a batchDelayTimeout, which is for firing off a batch if it hasn't had any new tasks pushed to the queue in the batchDelayTimeout time (in milliseconds.)

var q = new Queue(fn, {
  batchSize: 50,
  batchDelay: 5000,
  batchDelayTimeout: 1000
})
q.push(1);
q.push(2);

In the example above, the queue will wait for 50 items to fill up in 5s or process the queue if no new tasks were added in 1s.

Precondition

You can define a function called precondition that checks that it's ok to process the next batch. If the preconditions fail, it will keep calling this function until it passes again.

var q = new Queue(function (batch, cb) {

  // Do something that requires internet

}, {
  precondition: function (cb) {
    isOnline(function (err, ok) {
      if (ok) {
        cb(null, true);
      } else {
        cb(null, false);
      }
    })
  },
  preconditionRetryTimeout: 10*1000 // If we go offline, retry every 10s
})

Pause/Resume

There are options to control processes while they are running.

You can return an object in your processing function with the functions cancel, pause and resume. This will allow operations to pause, resume or cancel while it's running.

var uploader = new Queue(function (file, cb) {
  
  var worker = someLongProcess(file);

  return {
    cancel: function () {
      // Cancel the file upload
    },
    pause: function () {
      // Pause the file upload
    },
    resume: function () {
      // Resume the file upload
    }
  }
})
uploader.push('/path/to/file.pdf');
uploader.pause();
uploader.resume();

Cancel/Abort

You can also set cancelIfRunning to true. This will cancel a running task if a task with the same ID is pushed onto the queue.

var uploader = new Queue(function (file, cb) {
  var request = someLongProcess(file);
  return {
    cancel: function () {
      request.cancel();
    }
  }
}, {
  id: 'path',
  cancelIfRunning: true
})
uploader.push({ path: '/path/to/file.pdf' });
// ... Some time later
uploader.push({ path: '/path/to/file.pdf' });

In the example above, the first upload process is cancelled and the task is requeued.

You can also call .cancel(taskId) to cancel and unqueue the task.

uploader.cancel('/path/to/file.pdf');

Note that if you enable this option in batch mode, it will cancel the entire batch!

back to top


Advanced

Updating Task Status

The process function will be run in a context with progress, finishBatch and failedBatch functions.

The example below illustrates how you can use these:

var uploader = new Queue(function (file, cb) {
  this.failedBatch('some_error')
  this.finishBatch(result)
  this.progressBatch(bytesUploaded, totalBytes, "uploading")
});
uploader.on('task_finish', function (taskId, result) {
  // Handle finished result
})
uploader.on('task_failed', function (taskId, errorMessage) {
  // Handle error
})
uploader.on('task_progress', function (taskId, completed, total) {
  // Handle task progress
})

uploader.push('/some/file.jpg')
  .on('finish', function (result) {
    // Handle upload result
  })
  .on('failed', function (err) {
    // Handle error
  })
  .on('progress', function (progress) {
    // progress.eta - human readable string estimating time remaining
    // progress.pct - % complete (out of 100)
    // progress.complete - # completed so far
    // progress.total - # for completion
    // progress.message - status message
  })

Update Status in Batch mode (batchSize > 1)

You can also complete individual tasks in a batch by using failedTask and finishTask functions.

var uploader = new Queue(function (files, cb) {
  this.failedTask(0, 'some_error')         // files[0] has failed with 'some_error'
  this.finishTask(1, result)               // files[1] has finished with {result}
  this.progressTask(2, 30, 100, "copying") // files[2] is 30% done, currently copying
}, { batchSize: 3 });
uploader.push('/some/file1.jpg')
uploader.push('/some/file2.jpg')
uploader.push('/some/file3.jpg')

Note that if you use *-Task and *-Batch functions together, the batch functions will only apply to the tasks that have not yet finished/failed.

Queue Statistics

You can inspect the queue at any given time to see information about how many items are queued, average queue time, success rate and total item processed.

var q = new Queue(fn);
var stats = q.getStats();

// stats.total = Total tasks processed
// stats.average = Average process time
// stats.successRate = % success (between 0 and 1)
// stats.peak = Most tasks queued at any given point in time

back to top


Storage

Using a store

For your convenience, we have added compatibility for a few storage options.

By default, we are using an in-memory store that doesn't persist. You can change to one of our other built in stores by passing in the store option.

Built-in store

Currently, we support the following stores:

  • memory
  • sql (SQLite, PostgreSQL)

SQLite store (npm install sqlite3)

var q = new Queue(fn, {
  store: {
    type: 'sql',
    dialect: 'sqlite',
    path: '/path/to/sqlite/file'
  }
});

PostgreSQL store (npm install pg)

var q = new Queue(fn, {
  store: {
    type: 'sql',
    dialect: 'postgres',
    host: 'localhost',
    port: 5432,
    username: 'username',
    password: 'password',
    dbname: 'template1',
    tableName: 'tasks'
  }
});

Please help us add support for more stores; contributions are welcome!

Custom Store

Writing your own store is very easy; you just need to implement a few functions then call queue.use(store) on your store.

var q = new Queue(fn, { store: myStore });

or

q.use(myStore);

Your store needs the following functions:

q.use({
  connect: function (cb) {
    // Connect to your db
  },
  getTask: function (taskId, cb) {
    // Retrieves a task
  },
  putTask: function (taskId, task, priority, cb) {
    // Save task with given priority
  },
  takeFirstN: function (n, cb) {
    // Removes the first N items (sorted by priority and age)
  },
  takeLastN: function (n, cb) {
    // Removes the last N items (sorted by priority and recency)
  }
})

back to top


Using with Webpack

Better Queue can be used in the browser using the default in-memory store. However you have to create and pass the store to its constructor.

import Queue = require('better-queue')
import MemoryStore = require('better-queue-memory')

var q = new Queue(function (input, cb) {
  
  // Some processing here ...

  cb(null, result);
},
{
    store: new MemoryStore(),
  }
)

TypeScript Support

Better Queue can be used in TypeScript projects by installing type definitions from the Definitely Typed repository:

npm install --save @types/better-queue

Afterwards, you can simply import the library:

import Queue = require('better-queue')

const q: Queue = new Queue(() => {});

back to top


Full Documentation

new Queue(process, options)

The first argument can be either the process function or the options object.

A process function is required, all other options are optional.

  • process - function to process tasks. Will be run with either one single task (if batchSize is 1) or as an array of at most batchSize items. The second argument will be a callback cb(error, result) that must be called regardless of success or failure.

  • filter - function to filter input. Will be run with input whatever was passed to q.push(). If you define this function, then you will be expected to call the callback cb(error, task). If an error is sent in the callback then the input is rejected.
  • merge - function to merge tasks with the same task ID. Will be run with oldTask, newTask and a callback cb(error, mergedTask). If you define this function then the callback is expected to be called.
  • priority - function to determine the priority of a task. Takes in a task and returns callback cb(error, priority).
  • precondition - function that runs a check before processing to ensure it can process the next batch. Takes a callback cb(error, passOrFail).

  • id - The property to use as the task ID. This can be a string or a function (for more complicated IDs). The function (task, cb) and must call the callback with cb(error, taskId).
  • cancelIfRunning - If true, when a task with the same ID is running, its worker will be cancelled. Defaults to false.
  • autoResume - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to true.
  • failTaskOnProcessException - If true, when the process function throws an error the batch fails. Defaults to true.
  • filo - If true, tasks will be completed in a first in, last out order. Defaults to false.
  • batchSize - The number of tasks (at most) that can be processed at once. Defaults to 1.
  • batchDelay - Number of milliseconds to delay before starting to popping items off the queue. Defaults to 0.
  • batchDelayTimeout - Number of milliseconds to wait for a new task to arrive before firing off the batch. Defaults to Infinity.
  • concurrent - Number of workers that can be running at any given time. Defaults to 1.
  • maxTimeout - Number of milliseconds before a task is considered timed out. Defaults to Infinity.
  • afterProcessDelay - Number of milliseconds to delay before processing the next batch of items. Defaults to 1.
  • maxRetries - Maximum number of attempts to retry on a failed task. Defaults to 0.
  • retryDelay - Number of milliseconds before retrying. Defaults to 0.
  • storeMaxRetries - Maximum number of attempts before giving up on the store. Defaults to Infinity.
  • storeRetryTimeout - Number of milliseconds to delay before trying to connect to the store again. Defaults to 1000.
  • preconditionRetryTimeout - Number of milliseconds to delay before checking the precondition function again. Defaults to 1000.
  • store - Represents the options for the initial store. Can be an object containing { type: storeType, ... options ... }, or the store instance itself.

Methods on Queue

  • push(task, cb) - Push a task onto the queue, with an optional callback when it completes. Returns a Ticket object.
  • pause() - Pauses the queue: tries to pause running tasks and prevents tasks from getting processed until resumed.
  • resume() - Resumes the queue and its runnign tasks.
  • destroy(cb) - Destroys the queue: closes the store, tries to clean up.
  • use(store) - Sets the queue to read from and write to the given store.
  • getStats() - Gets the aggregate stats for the queue. Returns an object with properties successRate, peak, total and average, representing the success rate on tasks, peak number of items queued, total number of items processed and average processing time, respectively.
  • resetStats() - Resets all of the aggregate stats.

Events on Queue

  • task_queued - When a task is queued
  • task_accepted - When a task is accepted
  • task_started - When a task begins processing
  • task_finish - When a task is completed
  • task_failed - When a task fails
  • task_progress - When a task progress changes
  • batch_finish - When a batch of tasks (or worker) completes
  • batch_failed - When a batch of tasks (or worker) fails
  • batch_progress - When a batch of tasks (or worker) updates its progress

Events on Ticket

  • accepted - When the corresponding task is accepted (has passed filter)
  • queued - When the corresponding task is queued (and saved into the store)
  • started - When the corresponding task is started
  • progress - When the corresponding task progress changes
  • finish - When the corresponding task completes
  • failed - When the corresponding task fails
Comments
  • Safe to replace setImmediate with setTimeout?

    Safe to replace setImmediate with setTimeout?

    I am trying to use a webpacked version of better-queue in a Firefox extension, and it looks like setImmediate doesn't work in Firefox. Is it safe to replace all calls to setImmediate with setTimeout? I've tried reading the docs on the JS event loop, but it's not clear to me what the potential downsides are of using setTimeout for better-queue.

    opened by retorquere 17
  • Custom store for Nedb

    Custom store for Nedb

    I needed a queue with nedb as the datastore, so I created a custom store for better-queue.

    Does this implementation look correct? I based it mostly on the SqlServerAdapter.

    https://gist.github.com/dustintownsend/b701e5ffc620bb0c8bbde0d3f0bceefd

    Edit: I forgot to note that I didn't fully implement priorities.

    opened by dustintownsend 11
  • concurrent problem, wrong order

    concurrent problem, wrong order

    Hi, I have problem with concurrent, if I have concurrent set on 8 (for example) and add many tasks the task are do in wrong order. the first 7 is do invert the rest is done normally. There is no easy explanation how it will behave when I push 100 task for example. Sometimes is just first 8 in wrong order sometimes is first 16 but than I got something like this 7 6 5 4 3 2 1 11 10 8 9 12 13 14 15 16. The filo option is set on false so first in first out.

    opened by GonzoBe 9
  • Exiting with unfinished jobs

    Exiting with unfinished jobs

    I wrote an app which parses a large xml file with around 17M items of interest. These are added to a queue and using batch processing, written to a web service. When parsing is complete, the app exits (status code 0) with about half of the items are left unwritten. Any idea what might be going on?

    opened by kldavis4 8
  • Blocking HTTP Request

    Blocking HTTP Request

    The api receives 3mb of logs pushed to the push uri. This data will be added to the queue but does push wait till it is finished? Cause my api reponse will not accur?

    
        const APP_LOG = req.params.app.toLowerCase();
        let BODY: String = req.body;
        if (BODY.endsWith(";;")) {
            BODY = BODY.trim();
            BODY = BODY.substring(0, BODY.length - 2);
        }
        let lines = BODY.split(";;");
    
        que.push({
            app: APP_LOG,
            lines
        });
    

    And for my queue:

    let que = new BetterQueue(async function (input, cb) {
        console.time(`Queue`);
    // Here I put the logs in the DB
        console.timeEnd(`Queue`);
        cb();
    });
    
    opened by gitdevries 6
  • Store does not seem to work

    Store does not seem to work

    Hi, I have tried using the SQLite store, but the tasks do not seem to be persisted. When I push a new task to a queue, and the task is failing, I expect to find it in the database, at least between task executions, but it's not the case, the table in the db is always empty. I tried to understand what's going on and found out that the release lock function is being called after every run and in the store it's implemented to delete the task from the db. Is that correct? Why is the task not held in the database between executions to make it survive app restarts?

    The behavior I expect is actually that a task remains in the db until it finishes or fails after max retries.

    opened by hbj 6
  • Oversight on task_progress event?

    Oversight on task_progress event?

    Hello, I am recently working on a project using better-queue. I presume that the progressTask function should be able to work as a partial (say, inserted 10/100 records) and should be able to display that a task / ticket is 10% complete. However, utilizing either progressTask or progressBatch causes the ticket to stop being tracked / updated - I believe the below code is why:

    worker.on('task_progress', function (id, progress) {
        var taskId = taskIds[id];
        if (tickets[taskId]) {
          tickets[taskId].progress(progress);
          delete tickets[taskId];
        }
        self.emit('task_progress', taskId, progress);
      })
    

    It looks like this is almost identical behavior to the worker.on('task_finish) listener, and may have erroneously been copied over:

    worker.on('task_finish', function (id, result) {
        var taskId = taskIds[id];
        var stats = updateStatsForEndedTask(taskId);
        if (tickets[taskId]) {
          tickets[taskId].finish(result);
          delete tickets[taskId];
        }
        self.emit('task_finish', taskId, result, stats);
      })
    

    For my purposes, simply commenting out / removing the line delete tickets[taskId] in the task_progress listener above resolves this issue, and I do not see it breaking expected behavior. I just want to make sure that this is not the intended behavior, or if this conflicts with anything in any way.

    opened by sbasher314 5
  • async/await

    async/await

    function sleep(ms) {
      return new Promise(resolve => setTimeout(resolve, ms))
    }
    
    const q = new Queue(async (input, cb) => {
      console.log(input)
      await sleep(1000)
      cb()
    }, {
      concurrent: 3
    })
    

    Now I need to call in async callback functions. Can you do an automatic wait for asynchronous functions? To remove that callback

    opened by herenickname 5
  • Types Definition

    Types Definition

    Feature Request better-queue lacks a Types definition; either as part of this project or as part of DefinitelyTyped.

    Even without types, better-queue works fine with Typescript. Types is simply a "nice to have."

    Steps to Replicate index.ts

    import * as Queue from 'better-queue';
    
    let myQueue : Queue = new Queue(fn);
    

    tsc build error -> Could not find a declaration file for module 'better-queue' '/queue.js' implicitly has an 'any' type.

    Work Around Import with: let Queue = require('better-queue');

    opened by jbethke 5
  • Document queue.length

    Document queue.length

    When using better-queue, I needed to check if there are remaining items in the queue (couldn't rely on the drain event). I considered tracking pushed and finished/failed tasks, but I discovered the length property in the library's source. It works fine, however it is not documented.

    I'd love to see the property nailed down in the readme, mainly to make it a breaking change when it'd be renamed or removed. (To be in line with the rest of the API, an accessor method like getRemainingCount() would actually be more fitting. However, the length property is already there. Also, even more importantly, it is a very common JavaScript convention.)

    I would be happy to attach a PR if there are no objections against the idea?

    opened by loilo 5
  • need support for delayBatch

    need support for delayBatch

    Your lib is wonderful, but I need you support to config to pass this senario - Delay sending time 1 minute :

    1. I create a queue that hold all events (time serial event) with FIFO queue.
    2. And will hold for 1 minutes in queue
    3. Continuous send back via call function callback.

    I've read full your document and code but can not use your lib to define my case. Could you support me?

    Sorry, my bad English

    opened by dungvu-novus 5
  • in `on(

    in `on("task_failed", handler)` , when maxTimeout error is emitted, the `error` input must be an error object

    • From what I understand, if a task throws during execution, that error is propagated as a valid JavaScript Error object and so it can be used in
    q.on('task_failed', function (taskId, err, stats) {
      // Handle error, stats = { elapsed: <time taken> }
    })
    

    e.g., within the task_failed handler, I could call err.toString() - and I have been doing that. However, the emitted error in maxTimeout is just a string and so this assumption is broken: https://github.com/diamondio/better-queue/blob/83dca16637c15a0e055865f7d61c712b236c0c14/lib/queue.js#L605

    Suddenly, when I call err.toString() my script crashes as toString isn't a member of a String object. Hence, I believe the maxTimeout handling should at least emit an error.

    opened by TimDaub 0
  • Why does `maxTimeout` have no effect on `getStats().average`?

    Why does `maxTimeout` have no effect on `getStats().average`?

    • According to the docs: https://github.com/diamondio/better-queue#queue-statistics, getStats().average is the time in milliseconds a task spends executing in the queue
    • So if maxTimeout is set to Infinity then, we'd expect getStats().average to be anywhere between 0 and Infinity
    • However, shouldn't we expect getStats().average to be maximally 5000 if maxTimeout: 5000?

    Locally, I've set maxTimeout: 5000 but my average is still climbing. I'm receiving task_timeout in task_failed listener, but I'm wondering why average is higher. Another contributor opened an issue saying that maxTimeout may not work #75. Is there something to it?

    opened by TimDaub 4
  • Most recent version no longer works with PostgreSQL store?

    Most recent version no longer works with PostgreSQL store?

    I have the latest version of better-queue and better-queue-sql installed, verified that this morning. After the update to the most recent version however, whenever I attempt to push out a message I get this error.

    TypeError: this.knex.raw(...).then(...).error is not a function
        at PostgresAdapter.initialize (node_modules\better-queue-sql\postgres.js:63:6)
        at PostgresAdapter.connect (node_modules\better-queue-sql\postgres.js:31:8)
        at SqlStore.connect (node_modules\better-queue-sql\index.js:53:16)
        at Queue._connectToStore (node_modules\better-queue\lib\queue.js:169:15)
        at Queue.use (node_modules\better-queue\lib\queue.js:159:8)
        at new Queue (node_modules\better-queue\lib\queue.js:80:8)**
    

    I've step debugged to insure that the store object being passed to better-queue has valid information for my database, it does. I'm hoping this is an easy fix, any help would be appreciated.

    opened by DnOberon 0
  • batchSize bigger than 1 =>

    batchSize bigger than 1 => "TypeError [ERR_INVALID_ARG_TYPE]: The "path" argument must be of type string. Received undefined"

    When I try to use queue in a batch mode, any batchSize > 1 causes tasks to fail with an error:

    TypeError [ERR_INVALID_ARG_TYPE]: The "path" argument must be of type string. Received undefined
    

    The setup is minimal:

    const downloadFile = async (fileUrl, downloadFolder, fileName) => {
    
        const localFilePath = path.resolve(__dirname, downloadFolder, fileName);
        try {
            const response = await axios({
                method: "GET",
                url: fileUrl,
                responseType: "stream",
            });
    
            await response.data.pipe(fs.createWriteStream(localFilePath));
            return fileName;
        } catch (err) {
            throw (err);
        }
    };
    
    const cb = (arg) => { console.log(arg); };
    
    const processData = (data, cb) => {
        downloadFile(data.url, data.destFolder, data.destFilename).then((result) =>
            cb(null, `file ${result} downloaded`)).catch((err) => {
                // console.log(err);
                cb(err, null);
            });
    };
    
    const jobQueue = new Queue(processData, { batchSize: 2, concurrent: 1 });
    
    const data1 = {
        id: 1,
        url: "https://images.unsplash.com/photo-1616548479562-d5a581c064f4?ixid=MXwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHw%3D&ixlib=rb-1.2.1&auto=format&fit=crop&w=2125&q=80",
        destFolder: "D:\\images",
        destFilename: "1.jpg"
    };
    // this one should fail
    const data2 = {
        id: 2,
        url: "https://images.unsplash.co/photo-1550389941-bc1b1f91a962?ixlib=rb-1.2.1&ixid=MXwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHw%3D&auto=format&fit=crop&w=1267&q=80",
        destFolder: "D:\\images",
        destFilename: "2.jpg"
    };
    const data3 = {
        id: 3,
        url: "https://images.unsplash.com/photo-1572299409364-c1f3732d2ef5?ixid=MXwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHw%3D&ixlib=rb-1.2.1&auto=format&fit=crop&w=1350&q=80",
        destFolder: "D:\\images",
        destFilename: "3.jpg"
    };
    
    jobQueue.on('task_failed', (taskId, err) => { console.log(`task ${taskId} - error: `, err.toString()); })
        .on("task_finish", (taskId, result, stats) => { console.log(`task ${taskId} - result: ${result}, elapsed time: ${stats.elapsed}`); })
        .on('task_accepted', taskId => { console.log(`task: ${taskId} accepted`); })
        .on('task_queued', taskId => { console.log(`task: ${taskId} queued`); });;
    
    jobQueue.push(data1);
    jobQueue.push(data2);
    jobQueue.push(data3);
    

    What am I missing here?

    opened by PlkMarudny 1
  • 🐛 Large data in the store caused the app to crash

    🐛 Large data in the store caused the app to crash

    I'm using better-queue in production with a PostgreSQL store.

    We had an issue in the service running the queue and some significant data piled up in the store (2 312 233 tasks). Screen Shot 2021-01-19 at 10 23 47 PM

    This caused the application to crash 1 min after the start. The crash was completely silent, with no unhandledRejection nor uncaughtException and no signal traps either (SIGTERM, SIGINT, SIGUSR2). The machine resources were not exhausted the CPU was at 40% and very low memory consumption.

    I have logs set at the beginning of the queue process and on error events as you can see in the following code

    const queueOptions = {
      batchSize: 250000,
      batchDelay: 3600000,
      concurrent: 1,
      maxRetries: Infinity,
      autoResume: false, // I tested with and without the `autoResume`
      retryDelay: 3600000 + 10,
      afterProcessDelay: 3600000,
      precondition: async (cb: any) => {
        try {
          const lock = await cacheInstance.getValue(LOCK_KEY);
          if (lock) {
            logger.info('Precondition failed, resources still locked');
            cb(null, false);
          } else {
            cb(null, true);
          }
        } catch (err) {
          logger.warn('Couldn\'t check the queue precondition', err);
          cb(err);
        }
      },
      preconditionRetryTimeout: 3600000,
    };
    if (config.env === 'production') {
      // @ts-ignore
      queueOptions.store = {
        type: 'sql',
        dialect: 'postgres',
        host: process.env.DATABASE_HOST,
        port: process.env.DATABASE_PORT,
        username: process.env.DATABASE_USERNAME,
        password: process.env.DATABASE_PASSWORD,
        dbname: process.env.DATABASE_NAME,
        tableName: 'my_queue_store', // The table will be created automatically.
      };
    }
    const myQueue = new Queue(async (payload: any, cb: any) => {
      try {
        const lock = await cacheInstance.lock(LOCK_KEY, 3600000);
        // await doTheProcessing() and release the lock.
        cb(null, 'queue_processed');
      } catch (err) {
        // Release the lock.
        cb(err);
      }
    }, queueOptions );
    
    // Queue logs
    myQueue.on('batch_failed', (error: string) => {
      logger.warn(`Failed to process the queue`, error);
    });
    myQueue.on('batch_finish', () => {
      logger.info(`Processed the queue batch`);
    });
    

    also, I have the following logs when I push data to the queue

    myQueue.push(payload)
            .on('finish', (result) => {
              logger.verbose(`Pushed an event to the queue`, result);
            })
            .on('failed', (err) => {
              logger.warn(`Failed to push an event to queue`, err);
            });
    

    The absence of logs made it very hard to find the issue, I discovered the issue when disabling the SQL store and using the default memory where the app stopped crashing.

    My only solution, for now, was to backup the my_queue_store table and truncate it.

    my tech stack is the following: OS: 64bit Amazon Linux 2/5.2.1 running in EBS Node version: Node.js 12 running better-queue: 3.8.2 better-queue-sql: 1.0.3

    How can this be avoided? How to improve logs for a similar situation?

    Thank you 💗

    opened by AdelUnito 0
Owner
Diamond
Diamond
BullMQ - Premium Message Queue for NodeJS based on Redis

The fastest, most reliable, Redis-based distributed queue for Node. Carefully written for rock solid stability and atomicity. Read the documentation F

Taskforce.sh Inc. 3.1k Dec 30, 2022
Kue is a priority job queue backed by redis, built for node.js.

Kue Kue is no longer maintained Please see e.g. Bull as an alternative. Thank you! Kue is a priority job queue backed by redis, built for node.js. PRO

Automattic 9.4k Dec 20, 2022
A simple, fast, robust job/task queue for Node.js, backed by Redis.

A simple, fast, robust job/task queue for Node.js, backed by Redis. Simple: ~1000 LOC, and minimal dependencies. Fast: maximizes throughput by minimiz

Bee Queue 3.1k Jan 5, 2023
Redis Simple Message Queue

Redis Simple Message Queue A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server. tl;dr: If you run a R

Patrick Liess 1.6k Dec 27, 2022
A simple high-performance Redis message queue for Node.js.

RedisSMQ - Yet another simple Redis message queue A simple high-performance Redis message queue for Node.js. For more details about RedisSMQ design se

null 501 Dec 30, 2022
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.

Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, dis

Conveyor MQ 45 Dec 15, 2022
Yet another concurrent priority task queue, yay!

YQueue Yet another concurrent priority task queue, yay! Install npm install yqueue Features Concurrency control Prioritized tasks Error handling for b

null 6 Apr 4, 2022
Opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects.

qewe qewe is an opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects. Installation Add qewe to your p

Jamie McElwain 2 Jan 10, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 6 Nov 22, 2022
A document based messaging queue for Mongo, DocumentDB, and others

DocMQ Messaging Queue for any document-friendly architectures (DocumentDB, Mongo, Postgres + JSONB, etc). Why Choose This DocMQ is a good choice if yo

Jakob Heuser 10 Dec 7, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 4 Jul 5, 2022
Nodejs Background jobs using redis.

node-resque: The best background jobs in node. Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 an

Actionhero 1.2k Jan 3, 2023
Grupprojekt för kurserna 'Javascript med Ramverk' och 'Agil Utveckling'

JavaScript-med-Ramverk-Laboration-3 Grupprojektet för kurserna Javascript med Ramverk och Agil Utveckling. Utvecklingsguide För information om hur utv

Svante Jonsson IT-Högskolan 3 May 18, 2022
Hemsida för personer i Sverige som kan och vill erbjuda boende till mÀnniskor pÄ flykt

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

null 4 May 3, 2022
Kurs-repo för kursen Webbserver och Databaser

Webbserver och databaser This repository is meant for CME students to access exercises and codealongs that happen throughout the course. I hope you wi

null 14 Jan 3, 2023
Premium Queue package for handling distributed jobs and messages in NodeJS.

The fastest, most reliable, Redis-based queue for Node. Carefully written for rock solid stability and atomicity. Sponsors · Features · UIs · Install

null 13.5k Dec 31, 2022
BullMQ - Premium Message Queue for NodeJS based on Redis

The fastest, most reliable, Redis-based distributed queue for Node. Carefully written for rock solid stability and atomicity. Read the documentation F

Taskforce.sh Inc. 3.1k Dec 30, 2022
A template repo that contains a NodeJS app that will consume messages from a RabbitMQ queue and immediately send them to an Azure EventHub.

README.md Summary This repo (RabbitMQ to EventHub Shovel) is a template that contains a NodeJS app that will consume messages from a RabbitMQ queue an

Valtech San Diego 6 Jul 2, 2022
A nodejs implementation of AI Explained on youtube's SmartGPT prompting strategy for getting better responses from gpt4.

SmartGPT A Node.js script implementation of the SmartGPT prompting system by AI Explained on youtube, see the video here Prerequisites Node.js (node >

Caleb O'Leary 7 May 10, 2023