DocMQ
Messaging Queue for any document-friendly architectures (DocumentDB, Mongo, Postgres + JSONB, etc).
Why Choose This DocMQ is a good choice if you're looking for a document based message queue built around the visibility window. If you're someonme who's frustrated that Amazon's SQS has a 15 minute maximum delay, or are trying to query Redis like it's a database, then this is probably the kind of solution you're looking for. DocMQ works with anything that holds and queries documents or document-like objects.
Why AVOID This Simple. Performance. This kind of solution will never be as fast as an in-memory Redis queue or an event bus. If fast FIFO is your goal, you should consider BullMQ, Kue, Bee, Owl, and others.
Installation
You'll want to install DocMQ along with the mongodb client driver. This allows you to bring your own version of the mongo client in the event the MongoDB node driver changes in a material way.
Currently, DocMQ requires a Mongo Client >= 4.2 for transaction support.
# npm
npm i docmq mongodb
# yarn
yarn add docmq mongodb
# pnpm
pnpm add docmq mongodb
📚
Documentation
Creating a Queue
import { Queue, MongoDriver } from "docmq";
interface SimpleJob {
success: boolean;
}
const queue = new Queue<SimpleJob>(
new MongoDriver(process.env.DOC_DB_URL),
"docmq"
);
new Queue()
options
new Queue<T>(driver: Driver, name: string, options?: QueueOptions)
driver
a Driver implementation to use such as theMongoDriver
name
a string for the queue's nameoptions?
additional optionsretention.jobs?
number of seconds to retain jobs with no further work. Default3600
(1 hour)statInterval?
number of seconds between emitting astat
event with queue statistics, defaults to5
Adding a Job to the Queue
await queue.enqueue({
ref: "sample-id",
/* SimpleJob */ payload: {
success: true,
},
});
enqueue()
Options
queue.enqueue(job: JobDefinition<T> | JobDefinition<T>[])
job
the JSON Job object, consisting ofref?: string
an identifier for the job, allowing futureenqueue()
calls to replace the job with new data. Defaults to a v4 UUIDpayload: T
the job's payload which will be saved and sent to the handlerrunAt?: Date
a date object describing when the job should run. Defaults tonow()
runEvery?: string | null
Either a cron interval or an ISO-8601 duration, ornull
to remove recurrenceretries?: number
a number of tries for this job, defaults to5
retryStrategy?: RetryStrategy
a retry strategy, defaults toexponential
Retry Strategies
interface FixedRetryStrategy {
type: "fixed";
amount: number;
jitter?: number;
}
interface ExponentialRetryStrategy {
type: "exponential";
min: number;
max: number;
factor: number;
jitter?: number;
}
export interface LinearRetryStrategy {
type: "linear";
min: number;
max: number;
factor: number;
jitter?: number;
}
Handling Work (Processing)
queue.process(
async (job /* SampleJob */, api) => {
await api.ack();
},
{
/* options */
}
);
process()
Options
queue.process(handler: JobHandler<T, A, F>, config?: ProcessorConfig)
handler
the job handler function, taking the jobT
and the api as arguments, returns a promiseconfig?: ProcessorConfig
an optional configuration for the processor includingpause?: boolean
should the processor wait to be started, defaultfalse
concurrency?: number
the number of concurrent processor loops to run, default1
visibility?: number
specify the visibility window (how long a job is held for by default) in seconds, default30
pollInterval?: number
as a fallback, define how often to check for new jobs in the event that driver does not support evented notifications. Defaults to5
api
Methods and Members
api.ref
(string) the ref value of the jobapi.attempt
(number) the attempt number for this jobapi.visible
(number) the number of seconds this job was originally reserved forapi.ack(result: A)
acknowlegde the job, marking it complete, and scheduling future workapi.fail(reason: string | F)
fail the job and emit thereason
, scheduling a retry if requiredapi.ping(extendBy: number)
on a long running job, extend the runtime byextendBy
seconds
Events
The Queue
object has a large number of emitted events available through queue.events
. It extends EventEmitter
, and the most common events are below:
ack
when a job was acked successfullyfail
when a job was faileddead
when a job has exceeded its retries and was moved to the dead letter queuestats
an interval ping containing information about the queue's processing loadstart
,stop
when the queue starts and stops processinglog
,warn
,error
logging events from the queue
🔧
Custom Driver Support
License
DocMQ source is made available under the MIT license