A general-purpose message and event queuing library for MongoDB

Overview

MongoMQ2

NPM version Build status Coverage

MongoMQ2 is a light-weight Node.js library that turns MongoDB collections into general-purpose message queues or event logs, without additional server components.

At a slight expense of throughput compared to specialized message queues and brokers like SQS, SNS, RabbitMQ or Kafka, you get:

  • Persistent message/event logs in MongoDB collections.
  • Real-time, fan-out, at-most-once delivery to subscribers.
  • Isolated, acknowledged, at-least-once delivery to queue consumers.
    • Effectively exactly-once if consumer workloads are idempotent.
  • All the capabilities of regular MongoDB collections, e.g.
    • search indexes,
    • unique indexes for message/event deduplication,
    • aggregations,
    • capped collections,
    • sharding,
    • and TTL indexes.
  • No chaining of queues required because subscribers and consumers can read from the same queue.

There's more:

  • Configurable number of retries
  • Configurable visibility timeouts
  • Configurable visibility delays
  • Multiple isolated consumer groups on one queue
  • Batch publishing of messages/events

MongoMQ2 can be an effective and flexible building block for message- and event-driven architectures, especially if you're already on MongoDB and don't want to introduce additional system components.

Installation

npm install mongomq2 mongodb

Quick Start

import { MongoClient } from "mongodb";
import { Consumer, Publisher, Subscriber } from "mongomq2";

const mongoClient = new MongoClient("mongodb://localhost:27017");
await mongoClient.connect();

interface MyMessage {
  _id?: ObjectId;
  type: "hello" | "world";
}

const messagesCollection = mongoClient.db().collection<MyMessage>("messages");

// Subscribe to (future) messages of type "hello"
const subscriber = new Subscriber(messagesCollection);

subscriber.subscribe((message) => console.log("Received a hello!"), {
  filter: { type: "hello" },
});

// Consume messages (even past ones) of type "world"
const consumer = new Consumer(
  messagesCollection,
  (message) => console.log("Saved a world!"),
  { filter: { type: "world" } }
);

consumer.start();

// Publish some messages
const publisher = new Publisher(messagesCollection);

await publisher.publish({ type: "hello" });
await publisher.publish({ type: "world" });

// > Received a hello! (per active subscriber)
// > Saved a world! (consumed exactly once by one consumer)

Synopsis

Publisher

const publisher = new Publisher(collection);

await publisher.publish({ type: "hello" });
  • Publishes the given message to the database immediately.
  • Message insertion is acknowledged, or an error is thrown.

Use Cases

  • Critical messages and events
  • Job ingestion
  • Commands

BatchPublisher

const publisher = new BatchPublisher(collection);

publisher.publish({ type: "hello" });
  • Queues the given message for publication in memory.
  • Bulk inserts batched messages after a configurable delay.
  • By default publishes messages with best effort (majority write concern, retries)
  • Can be set to "fire & forget" mode by passing bestEffort: false (no write concern, no retries)

Use Cases

  • Uncritical messages
  • Uncritical notifications

Subscriber

const subscriber = new Subscriber(collection, {
  filter: {
    /* optional global filter applied on change stream */
  },
});

subscriber.subscribe((message) => console.log(message), {
  filter: {
    /* optional local filter applied in memory */
  },
});
  • Subscribes to matching messages in the future.
  • All active subscribers will receive all future matching messages.
  • Messages are delivered at most once.
  • Messages are delivered in database insertion order.
  • Past messages are ignored.
  • Each Subscriber instance creates one MongoDB change stream.
    • Change streams occupy one connection,
    • so you'll usually want only one Subscriber instance,
    • and multiple .subscribe(...) calls with local filters.

Use Cases

  • Real-time notifications
  • Cache invalidation

Consumer

const consumer = new Consumer(collection, (message) => console.log(message), {
  // consumer group identifier, defaults to collection name
  group: "myConsumerGroup",
  filter: {
    /* optional filter */
  },
});

consumer.start();
  • Consumes future and past matching messages.
  • Order of message consumption is not guaranteed.
  • Per group, each matching message is consumed by at most one consumer.
  • Messages are consumed at-least-once per group.
  • Configurable visibility timeout, visibility delay, maximum number of retries, etc.

Use Cases

  • Message queues
  • Job queues
  • Event processing
  • Command processing

Notes

  • All MongoMQ2 clients are EventEmitters.
  • Always attach .on('error', (err) => /* report error */) to monitor errors.
    • err.mq2 will contain the message being processed, if any.
  • Always .close() MongoMQ2 clients on shutdown (before closing the MongoClient).
    • MongoMQ2 will try to finish open tasks with best effort.
  • MongoDB change streams are only supported for MongoDB replica set.
    • To start a one-node replica set locally e.g. for testing, see docker-compose.yml
You might also like...

Making service workers easy so that your app is fast and reliable, even offline.

tulo.js Making service workers easy to use so that your app can be fast and reliable, even offline. Welcome to tulo.js, a service worker library that

Nov 16, 2022

Example repo for getting NextJS, Rust via wasm-pack, and web workers all playing nicely together.

Example of integrating WASM & web workers with a Typescript NextJS project. Running yarn yarn dev Open localhost:3000 Layout Rust code is in ./rust, g

Dec 23, 2022

generate statistics on the number of audience minutes your site is generating, and if readers make it to the end of your screeds

generate statistics on the number of audience minutes your site is generating, and if readers make it to the end of your screeds

audience-minutes generate statistics on the number of audience minutes your site is receiving, and if readers make it to the end of your screeds. “If

Dec 28, 2022

Challenge [Frontend Mentor] - In this challenge, JavaScript was used to filter jobs based on the selected categories. Technologies used: HTML5, CSS3 and React.

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

Apr 13, 2022

A simple package for single or batch image download and conversion using node streams.

image-batch-download A simple package for basic image downloading and processing. Supported formats: JPEG PNG WebP Installation With Yarn: yarn add im

Jan 2, 2022

Server and Browser code are really the same. It's magic.

Server and Browser code are really the same. It's magic.

Service Worker Magic Server (Cloudflare Workers) code is sw.js. Browser ( Service Worker ) code is sw.js. Cloudflare Workers sw.js serves sw.js. Servi

Dec 9, 2022

A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set

apiban-redis A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set. Installation This utility can be run as a

Apr 5, 2022

Cloud Run Jobs Demos - A collection of samples to show you how and when to run a container to completion without a server

Cloud Run Jobs Demo Applications Cloud Run Jobs allows you to run a container to completion without a server. This repository contains a collection of

Dec 23, 2022

Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime

Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime

WORKHORSE UI Out of the box modern User Interface, so you can see and manage your Workhorse jobs in realtime. Start local Run npm i Copy and name prox

Apr 15, 2022
BullMQ - Premium Message Queue for NodeJS based on Redis

The fastest, most reliable, Redis-based distributed queue for Node. Carefully written for rock solid stability and atomicity. Read the documentation F

Taskforce.sh Inc. 3.1k Dec 30, 2022
A simple high-performance Redis message queue for Node.js.

RedisSMQ - Yet another simple Redis message queue A simple high-performance Redis message queue for Node.js. For more details about RedisSMQ design se

null 501 Dec 30, 2022
A tool library for handling window && iframe && worker communication based on the JSON RPC specification

rpc-shooter A tool library for handling window && iframe && worker communication based on the JSON RPC specification 一个基于 JSON-RPC 规范用于处理 window && if

臼犀 89 Dec 20, 2022
Job queues and scheduled jobs for Node.js, Beanstalkd and/or Iron.io.

Ironium Job queues and scheduled jobs for Node.js backed by Beanstalk/IronMQ/SQS. The Why You've got a workload that runs outside the Web app's reques

Assaf Arkin 71 Dec 14, 2022
Bree is the best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support.

The best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support. Works in Node v10+ and browsers, uses workers to spawn sandboxed processes, and supports async/await, retries, throttling, concurrency, and graceful shutdown. Simple, fast, and lightweight. Made for @ForwardEmail and @ladjs.

Bree - The Best Node.js and JavaScript Job Scheduler 2.5k Dec 30, 2022
Type-safe and Promisified API for Web Worker and Iframe

?? You can help the author become a full-time open-source maintainer by sponsoring him on GitHub. typed-worker Install npm i typed-worker Usage Create

EGOIST 189 Dec 31, 2022
Premium Queue package for handling distributed jobs and messages in NodeJS.

The fastest, most reliable, Redis-based queue for Node. Carefully written for rock solid stability and atomicity. Sponsors · Features · UIs · Install

null 13.5k Dec 31, 2022
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.

Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, dis

Conveyor MQ 45 Dec 15, 2022
Build and deploy a roadmap voting app for your porject

Roadmap Voting App You can deploy Roadmap application yourself and get feedback from your users about your roadmap features. See the live example. In

Upstash 91 Jan 3, 2023