Queue is a node.js package to create background jobs in topic-based RabbitMQ exchanges and process them later.

Overview

Queue

npm version

PLG Works Queue helps with managing subscription and publish critical events using RabbitMQ. All events are published through RabbitMQ, using topic-based exchange.

Prerequisites

  • Basic understanding of RabbitMQ - reference

Installation

npm install @plgworks/queue --save

Initialize

RabbitMQ configuration is needed in initialization of the package. The configuration should include following parameters:

  • username [string] (mandatory) RabbitMQ connection credentials
  • password [string] (mandatory) RabbitMQ connection credentials
  • host [string] (mandatory) RabbitMQ host
  • port [string] (mandatory) RabbitMQ port
  • heartbeats [string] (mandatory) heartbeats defines after what period of time the peer TCP connection should be considered unreachable.
  • clusterNodes [Array] (mandatory) - List of RMQ cluster hosts.
  • enableRabbitmq [integer] (optional) 0 if local usage.
  • switchHostAfterSec [integer] (optional) Wait time before switching RMQ host.
  • connectionTimeoutSec [integer] (optional) Wait time for connection to establish.

Following snippet initializes PLG Works Queue Manager:

// Import the queue module.
const QueueManager = require('@plgworks/queue');

// Config Strategy for PLG Works Queue.
configStrategy = {
	"rabbitmq": {
        "username": "guest",
        "password": "guest",
        "host": "127.0.0.1",
        "port": "5672",
        "heartbeats": "30",
        "enableRabbitmq": 1
    }
};

// Create instance
const queueManagerInstance = await QueueManager.getInstance(configStrategy);

queueManagerInstance Object Methods

  • queueManagerInstance.subscribeEvent.rabbit(topics, options, readCallback, subscribeCallback)
    Description: Subscribe to multiple topics over a queue.
    Parameters:

    • topics [Array] (mandatory) - List of events to subscribe to.
    • options [object] (mandatory) Object with following keys:
      • queue [string] (optional) - Name of the queue on which you want to receive all your subscribed events. These queues and events, published in them, have TTL of 6 days. If a queue name is not passed, a queue with a unique name is created and is deleted when the subscriber gets disconnected.
      • ackRequired [integer] (optional) - The delivered message needs ack if passed 1 ( default 0 ). if 1 passed and ack not done, message will redeliver.
      • broadcastSubscription [integer] (optional) - Set to 1, when queue needs to be subscribed to broadcasting events.
      • prefetch [integer] (optional) - The number of messages released from queue in parallel. In case of ackRequired=1, queue will pause unless delivered messages are acknowledged.
    • readCallback [function] (mandatory) - Callback method will be invoked whenever there is a new notification.
    • subscribeCallback [function] (optional) - Callback method to return consumerTag.
  • queueManagerInstance.publishEvent.perform(params)
    Description: Publish event to topics.
    Parameters:

    • params [object] (mandatory) Object with following keys:
      • topics [Array] (optional) List of topic messages to publish.
      • broadcast [integer] (optional) When set to 1 message will be broadcasted to all channels. Default value is 0.
      • publishAfter [integer] (optional) Message to be sent after milliseconds.
      • publisher [string] (mandatory) Name of publisher
      • message [object] (mandatory) Object with following keys:
        • kind [string] (mandatory) Kind of the message.
        • payload [object] (optional) Payload to identify message and extra info.

Examples

Subscribe to events published through RabbitMQ

// Config Strategy for PLG Works Queue.
configStrategy = {
	"rabbitmq": {
        "username": "guest",
        "password": "guest",
        "host": "127.0.0.1",
        "port": "5672",
        "heartbeats": "30",
        "enableRabbitmq": 1
    }
};

// Import the queue module.
const QueueManager = require('@plgworks/queue');
let unAckCount = 0; // Number of unacknowledged messages.

const subscribe = async function() {
  let queueManagerInstance = await QueueManager.getInstance(configStrategy);
  queueManagerInstance.subscribeEvent.rabbit(
    ["event.PublicTestEvent"], // List of events
    {
      queue: 'testQueue',
      ackRequired: 1, // When set to 1, all delivered messages MUST get acknowledge.
      broadcastSubscription: 1, // When set to 1, it will subscribe to broadcast channel and receive all broadcasted messages. 
      prefetch:10
    }, 
    function(msgContent){
      // Please make sure to return promise in callback function. 
      // On resolving the promise, the message will get acknowledged.
      // On rejecting the promise, the message will be re-queued (noAck)
      return new Promise(async function(onResolve, onReject) {
        // Incrementing unacknowledged message count.
        unAckCount++;
        console.log('Consumed message -> ', msgContent);
        response = await processMessage(msgContent);
        
        // Complete the task and in the end of all tasks done
        if(response == success){
          // The message MUST be acknowledged here.
          // To acknowledge the message, call onResolve
          // Decrementing unacknowledged message count.
          unAckCount--;
          onResolve();   
        } else {
          //in case of failure to requeue same message.
          onReject();
        }
       
      })
    
    });
};
// Gracefully handle SIGINT, SIGTERM signals.
// Once SIGINT/SIGTERM signal is received, programme will stop consuming new messages. 
// But, the current process MUST handle unacknowledged queued messages.
process.on('SIGINT', function () {
  console.log('Received SIGINT, checking unAckCount.');
  const f = function(){
    if (unAckCount === 0) {
      process.exit(1);
    } else {
      console.log('waiting for open tasks to be done.');
      setTimeout(f, 1000);
    }
  };
  setTimeout(f, 1000);
});

function rmqError(err) {
  console.log('rmqError occured.', err);
  process.emit('SIGINT');
}
// Event published from package in case of internal error.
process.on('rmq_error', rmqError);
subscribe();

Listen to multiple events with one subscriber

// Config Strategy for PLG Works Queue.
configStrategy = {
	"rabbitmq": {
        "username": "guest",
        "password": "guest",
        "host": "127.0.0.1",
        "port": "5672",
        "heartbeats": "30",
        "enableRabbitmq": 1
    }
};

// Import the queue module.
const QueueManager = require('@plgworks/queue');
const subscribeMultiple = async function() {
  let queueManagerInstance = await QueueManager.getInstance(configStrategy);
  queueManagerInstance.subscribeEvent.rabbit(
    ["event.PublicTestEvent1", "event.PublicTestEvent2"],
    {}, 
    function(msgContent){
      console.log('Consumed message -> ', msgContent)
    });
  };
subscribeMultiple();

Publish Notifications

All events are by default published using EventEmitter and if configured, through RabbitMQ as well.

// Config Strategy for PLG Works Queue.
configStrategy = {
	"rabbitmq": {
        "username": "guest",
        "password": "guest",
        "host": "127.0.0.1",
        "port": "5672",
        "heartbeats": "30",
        "connectionTimeoutSec": "60",
        "enableRabbitmq": 1
    }
};

// Import the Queue module.
const QueueManager = require('@plgworks/queue');
const publish = async function() {
  let queueManagerInstance = await QueueManager.getInstance(configStrategy);
  queueManagerInstance.publishEvent.perform(
    {
      topics:["event.PublishTestEvent"],
      broadcast: 1, // When set to 1 message will be broadcasted to all channels.
      publishAfter: 1000, // message to be sent after milliseconds.
      publisher: 'MyPublisher',
      message: {
  	    kind: "event_received",
  	    payload: {
  		   // Custom payload for message
  	    }
  	  }
    });
};
publish();

Pause and Restart queue consumption

We also support pause and start queue consumption. According to your logical condition, you can fire below events from your process to pause or restart consumption respectively. Pausing consumption can be the first step in SIGINT handling.

// Config Strategy for PLG Works Queue.
let configStrategy = {
	"rabbitmq": {
        "username": "guest",
        "password": "guest",
        "host": "127.0.0.1",
        "port": "5672",
        "heartbeats": "30",
        "enableRabbitmq": 1
    }
};

let queueConsumerTag = null;
// Import the queue module.
const QueueManager = require('@plgworks/queue');
const subscribePauseRestartConsume = async function() {
  let queueManagerInstance = await QueueManager.getInstance(configStrategy);
  queueManagerInstance.subscribeEvent.rabbit(
      ["event.PublicTestEvent1", "event.PublicTestEvent2"],
      {}, 
      function(msgContent){
        console.log('Consumed message -> ', msgContent);
        
        if(some_failure_condition){
          process.emit('CANCEL_CONSUME', queueConsumerTag);
        }
        
        if(failure_resolve_detected){
          process.emit('RESUME_CONSUME', queueConsumerTag);
        }
      },
      function(consumerTag) {
        queueConsumerTag = consumerTag;
      }
    );
  };
subscribePauseRestartConsume();

Running test cases

Run following command to execute test cases.

./node_modules/.bin/mocha --recursive "./test/**/*.js"
You might also like...

Inter Process Communication Module for node supporting Unix sockets, TCP, TLS, and UDP. Giving lightning speed on Linux, Mac, and Windows. Neural Networking in Node.JS

Inter Process Communication Module for node supporting Unix sockets, TCP, TLS, and UDP. Giving lightning speed on Linux, Mac, and Windows. Neural Networking in Node.JS

Inter Process Communication Module for node supporting Unix sockets, TCP, TLS, and UDP. Giving lightning speed on Linux, Mac, and Windows. Neural Networking in Node.JS

Dec 9, 2022

A Minimalist to do list website where user can add, remove and edit multiple tasks and All the changes user makes in his to do list is saved in browser local storage so that it can be accessed later.

Testing for Add Remove function in To Do List App Jest framework is used for testing. Created (addremove.test.js) for a file containing the add item a

Aug 15, 2022

Package fetcher is a bot messenger which gather npm packages by uploading either a json file (package.json) or a picture representing package.json. To continue...

package-fetcher Ce projet contient un boilerplate pour un bot messenger et l'executable Windows ngrok qui va permettre de créer un tunnel https pour c

Mar 29, 2022

An open source movie library platform for viewing movie info and saving movies for later.

An open source movie library platform for viewing movie info and saving movies for later.

GoodWatch An open source movie library platform for viewing movie info and saving movies for later. How to get started? Fork and clone the repo. Then

Apr 28, 2022

In this project, I built a simple HTML list of To Do tasks. The list is styled according to the specifications listed later in this lesson. This simple web page is built using webpack and served by a webpack dev server.

Awesome books:JavaScript Using Modules In this project, I built a simple HTML list of To Do tasks. The list is styled according to the specifications

Nov 25, 2022

Omnivore - a complete, open source read-it-later solution for people who like text

Omnivore - a complete, open source read-it-later solution for people who like text

Omnivore Omnivore is a complete, open source read-it-later solution for people who like text. We built Omnivore because we love reading and we want it

Jan 1, 2023

Let's table this object till a later date

tabling npm add tabling makes objects lazy Commit a getter's result for one-time evaluation. ⚙️ Install npm add tabling 🚀 Usage import { tabling } fr

Mar 19, 2022

A package that allows your bot of discord.js v13 & v14 to create the new awesome Discord Modals and interact with them

A package that allows your bot of discord.js v13 & v14 to create the new awesome Discord Modals and interact with them

A package that allows your bot of discord.js v13 & v14 to create the new awesome Discord Modals and interact with them

Dec 23, 2022

Coverage-guided, in-process fuzzing for the Node.js

Coverage-guided, in-process fuzzing for the Node.js

Jazzer.js Jazzer.js is a coverage-guided, in-process fuzzer for the Node.js platform developed by Code Intelligence. It is based on libFuzzer and brin

Dec 26, 2022
Releases(v1.0)
Owner
PLG Works
We design, build and deliver highly scalable and secure web and mobile applications using React, Next.js, React Native, Web3.js, Node.js, Ruby On Rails.
PLG Works
A lightweight Nano Node implementation made for wallets, exchanges and other services.

About This is a Light Nano Node implementation made for Wallets, Exchanges and other services. This Node has been built to be compatible with the offi

Nano - Light Net 7 Jun 25, 2022
Happy Birthday is a web-based party live background generated by CANVAS powered by JavaScript. This show a lot of random colorize particles in the background.

Happy BirthDay JS Happy Birthday is a web-based party live background generated by CANVAS powered by JavaScript. This show a lot of random colorize pa

Max Base 9 Oct 29, 2022
A simple to do list webpage where you can log the daily tasks you have to do, mark them as checked, modify them, reorder them and remove them. Made using HTML, CSS and JavaScript.

To-Do-List This Webpage is for an app called To-Do-List which helps you add, remove or check tasks you have to do. It is a simple web page which conta

Zeeshan Haider 9 Mar 12, 2022
an open-source package to make it easy and simple to work with RabbitMQ's RPC ( Remote Procedure Call )

RabbitMQ Easy RPC (Remote Procedure Call ) The Node.js's RabbitMQ Easy RPC Library rabbitmq-easy-RPC is an easy to use npm package for rabbitMQ's RPC

Ali Amjad 4 Sep 22, 2022
Learning WebRTC. Topic for FFTT.

ProgrammingWebrtc This project was generated using Nx. ?? Smart, Fast and Extensible Build System Adding capabilities to your workspace Nx supports ma

Gen 4 Mar 5, 2022
A package to manage cron-jobs in a node.js Typescript application.

This package is a package to manage cron-jobs in a node.js Typescript application. It is built using node-cron and reflect-metadata packages

Osemudiamen Itua 7 Jul 17, 2022
This is a Webpack based to-do-list project. With this app, users can add thier daily routine tasks to the list, mark them as complet, edit them or delete them.

To Do List This is a Webpack based to-do-list project. With this app, users can add thier daily routine tasks to the list, mark them as complet, edit

Ali Aqa Atayee 12 Oct 30, 2022
This template is for generating a .NET C# wrapper for the RabbitMQ client based on your AsyncAPI document.

.NET C# RabbitMQ template This is a .NET C# RabbitMQ template for the AsyncAPI generator This template is for generating a .NET C# wrapper for the Rab

AsyncAPI Initiative 5 Dec 21, 2022
NestJS implementation of client and strategy hasn't enough features for work with RabbitMQ so i developed this one (basically, just a wrapper for amqp-connection-manager)

NestJS RabbitMQ Client and strategy NestJS implementation of client and strategy hasn't enough features for work with RabbitMQ so i developed this one

ilink 5 Sep 6, 2022
A RabbitMQ client for TypeScript, with functional programming in mind.

RabbitMQ-fp Lets feed our Rabbit' nicely ?? This repository contains a wrapper over amqplib written in Typescript with an accent of functionnal progra

MansaGroup 3 Sep 6, 2022