Redis Simple Message Queue

Overview

RSMQ: Redis Simple Message Queue for Node.js

Redis Simple Message Queue

A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server.

Build Status Dependency Status

tl;dr: If you run a Redis server and currently use Amazon SQS or a similar message queue you might as well use this fast little replacement. Using a shared Redis server multiple Node.js processes can send / receive messages.

Features

  • Lightweight: Just Redis and ~500 lines of javascript.
  • Speed: Send/receive 10000+ messages per second on an average machine. It's just Redis.
  • Guaranteed delivery of a message to exactly one recipient within a messages visibility timeout.
  • Received messages that are not deleted will reappear after the visibility timeout.
  • Test coverage
  • A message is deleted by the message id. The message id is returned by the sendMessage and receiveMessage method.
  • Messages stay in the queue unless deleted.
  • Optional RESTful interface via rest-rsmq
  • Typescript Typings ❤️
  • Optional Promise-based API (only if Promise is defined), just suffix your method with Async, eg: sendMessage -> sendMessageAsync, all queue methods are supported

Note: RSMQ uses the Redis EVAL command (LUA scripts) so the minimum Redis version is 2.6+.

Usage

  • After creating a queue you can send messages to that queue.
  • The messages will be handled in a FIFO (first in first out) manner unless specified with a delay.
  • Every message has a unique id that you can use to delete the message.
  • The sendMessage method will return the id for a sent message.
  • The receiveMessage method will return an id along with the message and some stats.
  • Should you not delete the message it will be eligible to be received again after the visibility timeout is reached.
  • Please have a look at the createQueue and receiveMessage methods described below for optional parameters like visibility timeout and delay.

Installation

npm install rsmq

Modules for RSMQ

To keep the core of RSMQ small additional functionality is available as modules:

RSMQ in other languages

The simplicity of RSMQ is useful in other languages. Here is a list of implementations in other languages:

Note: Should you plan to port RSQM to another language please make sure to have tests to ensure compatibility with all RSMQ clients. And of course: let me know so i can mention your port here.

Methods

Constructor

Creates a new instance of RSMQ.

Parameters:

  • host (String): optional (Default: "127.0.0.1") The Redis server
  • port (Number): optional (Default: 6379) The Redis port
  • options (Object): optional (Default: {}) The Redis options object.
  • client (RedisClient): optional A existing redis client instance. host and server will be ignored.
  • ns (String): optional (Default: "rsmq") The namespace prefix used for all keys created by RSMQ
  • realtime (Boolean): optional (Default: false) Enable realtime PUBLISH of new messages (see the Realtime section)
  • password (String): optional (Default: null) If your Redis server requires a password supply it here

Example:

const RedisSMQ = require("rsmq");
const rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );

Queue

createQueue(options, callback)

Create a new queue.

Parameters:

  • qname (String): The Queue name. Maximum 160 characters; alphanumeric characters, hyphens (-), and underscores (_) are allowed.
  • vt (Number): optional (Default: 30) The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)
  • delay (Number): optional (Default: 0) The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)
  • maxsize (Number): optional (Default: 65536) The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)

Returns:

  • 1 (Number)

Example:

rsmq.createQueue({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("queue created")
	}
});

listQueues(options, callback)

List all queues

Returns an array:

  • ["qname1", "qname2"]

Example:

rsmq.listQueues(function (err, queues) {
	if (err) {
		console.error(err)
		return
	}

	console.log("Active queues: " + queues.join( "," ) )
});

deleteQueue(options, callback)

Deletes a queue and all messages.

Parameters:

  • qname (String): The Queue name.

Returns:

  • 1 (Number)

Example:

rsmq.deleteQueue({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("Queue and all messages deleted.")
	} else {
		console.log("Queue not found.")
	}
});

getQueueAttributes(options, callback)

Get queue attributes, counter and stats

Parameters:

  • qname (String): The Queue name.

Returns an object:

  • vt (Number): The visibility timeout for the queue in seconds
  • delay (Number): The delay for new messages in seconds
  • maxsize (Number): The maximum size of a message in bytes
  • totalrecv (Number): Total number of messages received from the queue
  • totalsent (Number): Total number of messages sent to the queue
  • created (Number): Timestamp (epoch in seconds) when the queue was created
  • modified (Number): Timestamp (epoch in seconds) when the queue was last modified with setQueueAttributes
  • msgs (Number): Current number of messages in the queue
  • hiddenmsgs (Number): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to a vt parameter or when sent with a delay

Example:

rsmq.getQueueAttributes({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err);
		return;
	}

	console.log("==============================================");
	console.log("=================Queue Stats==================");
	console.log("==============================================");
	console.log("visibility timeout: ", resp.vt);
	console.log("delay for new messages: ", resp.delay);
	console.log("max size in bytes: ", resp.maxsize);
	console.log("total received messages: ", resp.totalrecv);
	console.log("total sent messages: ", resp.totalsent);
	console.log("created: ", resp.created);
	console.log("last modified: ", resp.modified);
	console.log("current n of messages: ", resp.msgs);
	console.log("hidden messages: ", resp.hiddenmsgs);
});

setQueueAttributes(options, callback)

Sets queue parameters.

Parameters:

  • qname (String): The Queue name.
  • vt (Number): optional * The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)
  • delay (Number): optional The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)
  • maxsize (Number): optional The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)

Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.

Returns an object:

  • vt (Number): The visibility timeout for the queue in seconds
  • delay (Number): The delay for new messages in seconds
  • maxsize (Number): The maximum size of a message in bytes
  • totalrecv (Number): Total number of messages received from the queue
  • totalsent (Number): Total number of messages sent to the queue
  • created (Number): Timestamp (epoch in seconds) when the queue was created
  • modified (Number): Timestamp (epoch in seconds) when the queue was last modified with setQueueAttributes
  • msgs (Number): Current number of messages in the queue
  • hiddenmsgs (Number): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to a vt parameter or when sent with a delay

Example:

rsmq.setQueueAttributes({ qname: "myqueue", vt: "30"}, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	console.log("changed the invisibility time of messages that have been received to 30 seconds");
	console.log(resp);
});

Messages

sendMessage

Sends a new message.

Parameters:

  • qname (String)
  • message (String)
  • delay (Number): optional (Default: queue settings) The time in seconds that the delivery of the message will be delayed. Allowed values: 0-9999999 (around 115 days)

Returns:

  • id (String): The internal message id.

Example:

rsmq.sendMessage({ qname: "myqueue", message: "Hello World "}, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	console.log("Message sent. ID:", resp);
});

receiveMessage(options, callback)

Receive the next message from the queue.

Parameters:

  • qname (String): The Queue name.
  • vt (Number): optional (Default: queue settings) The length of time, in seconds, that the received message will be invisible to others. Allowed values: 0-9999999 (around 115 days)

Returns an object:

  • message (String): The message's contents.
  • id (String): The internal message id.
  • sent (Number): Timestamp of when this message was sent / created.
  • fr (Number): Timestamp of when this message was first received.
  • rc (Number): Number of times this message was received.

Note: Will return an empty object if no message is there

Example:

rsmq.receiveMessage({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp.id) {
		console.log("Message received.", resp)
	} else {
		console.log("No messages for me...")
	}
});

deleteMessage(options, callback)

Parameters:

  • qname (String): The Queue name.
  • id (String): message id to delete.

Returns:

  • 1 if successful, 0 if the message was not found (Number).

Example:

rsmq.deleteMessage({ qname: "myqueue", id: "dhoiwpiirm15ce77305a5c3a3b0f230c6e20f09b55" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("Message deleted.")
	} else {
		console.log("Message not found.")
	}
});

popMessage(options, callback)

Receive the next message from the queue and delete it.

Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.

Parameters:

  • qname (String): The Queue name.

Returns an object:

  • message (String): The message's contents.
  • id (String): The internal message id.
  • sent (Number): Timestamp of when this message was sent / created.
  • fr (Number): Timestamp of when this message was first received.
  • rc (Number): Number of times this message was received.

Note: Will return an empty object if no message is there

Example:

rsmq.popMessage({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp.id) {
		console.log("Message received and deleted from queue", resp)
	} else {
		console.log("No messages for me...")
	}
});

changeMessageVisibility(options, callback)

Change the visibility timer of a single message. The time when the message will be visible again is calculated from the current time (now) + vt.

Parameters:

  • qname (String): The Queue name.
  • id (String): The message id.
  • vt (Number): The length of time, in seconds, that this message will not be visible. Allowed values: 0-9999999 (around 115 days)

Returns:

  • 1 if successful, 0 if the message was not found (Number).

Example:

rsmq.changeMessageVisibility({ qname: "myqueue", vt: "60", id: "dhoiwpiirm15ce77305a5c3a3b0f230c6e20f09b55" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("message hidden for 60 seconds")
	}
});

quit(callback)

Disconnect the redis client. This is only useful if you are using rsmq within a script and want node to be able to exit.

Realtime

When initializing RSMQ you can enable the realtime PUBLISH for new messages. On every new message that gets sent to RSQM via sendMessage a Redis PUBLISH will be issued to {rsmq.ns}:rt:{qname}.

Example for RSMQ with default settings:

  • The queue testQueue already contains 5 messages.
  • A new message is being sent to the queue testQueue.
  • The following Redis command will be issued: PUBLISH rsmq:rt:testQueue 6

How to use the realtime option

Besides the PUBLISH when a new message is sent to RSMQ nothing else will happen. Your app could use the Redis SUBSCRIBE command to be notified of new messages and issue a receiveMessage then. However make sure not to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous receiveMessage calls.

Changes

see the CHANGELOG

Other projects

Name Description
node-cache Simple and fast Node.js internal caching. Node internal in memory cache like memcached.
redis-tagging A Node.js helper library to make tagging of items in any legacy database (SQL or NoSQL) easy and fast.
redis-sessions An advanced session store for Node.js and Redis
rsmq-worker Helper to implement a worker based on RSMQ (Redis Simple Message Queue).
connect-redis-sessions A connect or express middleware to use redis sessions that lets you handle multiple sessions per user_id.

The MIT License

Please see the LICENSE.md file.

Comments
  • How does a process know that there are messages waiting?

    How does a process know that there are messages waiting?

    Let's say you have a client and server situation. The client can enqueue messages and the server can pull the messages off the queue. Can you define the server so that it will "wait" for a message to arrive so that it can then retrieve it and then go back and wait for more?

    The documentation seems to imply that you should only retrieve a message if you know one is there, because a retrieve will return nothing if there are no messages, rather than wait for something to arrive - but that doesn't make much sense :(

    There seems to be a missing "consume" method which would allow a process to wait for messages to arrive, and retrieve them when they do.

    opened by kpturner 12
  • Python version

    Python version

    As I needed it, I went ahead and threw together a Python implementation (https://github.com/mlasevich/PyRSMQ) based partially on this and partially on Java version.

    It is still a very early version, but I think I got much of the core functionality (except for realtime processing) and unit tests are very much lacking. Hopefully should get better soon :-)

    opened by mlasevich 9
  • add option to specify id on sendMessage.

    add option to specify id on sendMessage.

    I need an option to specify id when using sendMessage (optional) while ignoring if message with the same id already exists. I think this will be good option to make sure one message will be sent from multiple running instances (e.g. scheduler). I've been using this from other redis powered queue manager in node and it helped a lot. I switched to RSMQ because it's lightweight and exactly suits my needs atm.

    Great work by the way, keep it up 👍

    wontfix 
    opened by overflowz 7
  • Custom keys or pattern keys

    Custom keys or pattern keys

    Is there any way to create or define custom task key or define pattern/prefix for each task ? For examples app1:user1:jobtypeA:{serial} where serial can be a hash like before or any integer number Thank you

    opened by nomi-ramzan 7
  • Delete message on receive

    Delete message on receive

    Thanks for a great queue library!

    For my application, I'm looking to delete a message as it is received. I can currently do this by issuing a deleteMessage straight after a receiveMessage, but I'd like to go one step further and add support for this in the receiveMessage lua script, so that the delete happens atomically.

    I was wondering whether you'd accept a PR which implements this functionality.

    Here's what I'm thinking:

    • Allow a vt of -1 - this indicates that a message should be deleted when it is pulled off the queue.
    • Check if vt is -1 in the lua script - if so, run the zrem and hdel operations to delete the message.

    If reusing the vt variable isn't desirable, perhaps a new option could be used instead, for example remove_on_receive.

    enhancement question 
    opened by fiznool 7
  • Promise support

    Promise support

    I needed to migrate from some services using Amazon SQS applications to the Redis queues. I wrote this interface to solve rsmq like a promises.

    This is necessary in several cases on my microservices and lambda functions. Is it possible to add this into rsmq core?

    https://github.com/msfidelis/rsmq-promise

    enhancement 
    opened by msfidelis 6
  • Just to make it clear

    Just to make it clear

    This library looks really interesting, but I wonder why native pub/sub functions of Redis is not used, instead library is utilizes own implementation.

    Was it created before pub/sub appeared in Redis?

    question 
    opened by alexbeletsky 6
  • RSMQ message queue configuration on ibm cloud

    RSMQ message queue configuration on ibm cloud

    How to get configuration details like host, port, key, etc of RSMQ on IBM cloud.

    Created one on AWS https://xxxxxxxxxxxx.amazonaws.com/xxxxxxxxxxx/Test.fifo But not able to connect it.

    const RedisSMQ = require("rsmq"); const rsmq = new RedisSMQ( {host: "https://xxxxxxxxxxxx.amazonaws.com/xxxxxxxxxxx/Test.fifo", port: 6379, ns: "rsmq"} );

    question 
    opened by mayurgujrathi 5
  • rsmq and rsqm-worker does not work with a ioredis client and Sentinels

    rsmq and rsqm-worker does not work with a ioredis client and Sentinels

    as published here https://github.com/mpneuried/rsmq-worker/issues/21

    Hi to all,

    I'm using ioredis to establish a connection to Redis and use it as a client to RSMQ https://github.com/smrchy/rsmq and RSMQ-worker https://github.com/mpneuried/rsmq-worker.

    If I use a standalone Redis server with default settings localhost:6370, all my code work as charm and tasks are being processed as expected.

    Issues appear as soon as I try to use ioredis with Sentinels option (https://github.com/luin/ioredis#sentinel)

    So after shut down the standalone Redis instance, I have set up a Sentinels environments with

    a master server and 2 slaves servers in my local machine. Then I connect to them with

    var redis_client = new Redis({
         sentinels: [{host: "localhost", port: "16380"},{host: "localhost", port: "16381"},{host: "localhost",             port: "16382"}],
    name: "redis-cluster"
    

    }); as reported in https://github.com/luin/ioredis#sentinel. Apparently everything seems to work correctly, and

    redis_client.on('connect;,function(){} says that the client has been connected to the Sentinels servers. Then according to the RSMQ documentation I can use the already existing Redis client with the RSMQWorker

    var worker = new RSMQWorker("mytask", { interval: [0, 10], autostart: true, maxReceiveCount: variables.rsmq.maxReceiveCount, customExceedCheck: fnCheck, redis: redis_client, redisPrefix: "rsmq", alwaysLogErrors: true }); but unfortunately with these settings tasks stop to work. As a clue the Redis Master server receives nothing on monitor. So I started to investigate and first of all I have printed out the redis_client object, just to discover a couple of localhost connections in it.

    [...] options: { sentinels: [ [Object], [Object], [Object], [Object], [Object] ], name: 'redis-cluster', port: 6379, <--------- host: 'localhost', <---------- [...] SentinelConnector { options: { sentinels: [Object], name: 'redis-cluster', port: 6379, <------ host: 'localhost', <------ family: 4, [...] Does someone knows why even if I connect using Sentinels, the redis_client json has localhost and local port values on ?

    I was wondering if someone has experience with Sentinels and with these kind of issues.

    Thanks in advance

    REDIS CLIENT: Redis { domain: null, _events: { connect: [ [Function], [Object] ], close: { [Function: g] listener: [Function] } }, _eventsCount: 2, _maxListeners: undefined, options: { sentinels: [ [Object], [Object], [Object], [Object], [Object] ], name: 'redis-cluster', port: 6379, host: 'localhost', family: 4, connectTimeout: 3000, retryStrategy: [Function], keepAlive: 0, connectionName: null, role: 'master', sentinelRetryStrategy: [Function], password: null, db: 0, parser: null, dropBufferSupport: false, enableOfflineQueue: true, enableReadyCheck: true, autoResubscribe: true, autoResendUnfulfilledCommands: true, lazyConnect: false, keyPrefix: '', reconnectOnError: null, readOnly: false, stringNumbers: false }, scriptsSet: {}, commandQueue: { [String: '[object Object]'] _capacity: 16, _length: 1, _front: 0 }, offlineQueue: { [String: ''] _capacity: 16, _length: 0, _front: 0 }, connector: SentinelConnector { options: { sentinels: [Object], name: 'redis-cluster', port: 6379, host: 'localhost', family: 4, connectTimeout: 3000, retryStrategy: [Function], keepAlive: 0, connectionName: null, role: 'master', sentinelRetryStrategy: [Function], password: null, db: 0, parser: null, dropBufferSupport: false, enableOfflineQueue: true, enableReadyCheck: true, autoResubscribe: true, autoResendUnfulfilledCommands: true, lazyConnect: false, keyPrefix: '', reconnectOnError: null, readOnly: false, stringNumbers: false }, connecting: true, retryAttempts: 0, currentPoint: 0, sentinels: [ [Object], [Object], [Object], [Object], [Object] ], stream: Socket { _connecting: false, _hadError: false, _handle: [Object], _parent: null, _host: null, _readableState: [Object], readable: true, domain: null, _events: [Object], _eventsCount: 7, _maxListeners: undefined, _writableState: [Object], writable: true, allowHalfOpen: false, destroyed: false, bytesRead: 0, _bytesDispatched: 14, _sockname: null, _pendingData: null, _pendingEncoding: '', server: null, _server: null, _idleTimeout: -1, _idleNext: null, _idlePrev: null, _idleStart: 1244, read: [Function], _consuming: true, _peername: [Object] } }, retryAttempts: 0, status: 'connect', condition: { select: 0, auth: null, subscriber: false }, stream: Socket { _connecting: false, _hadError: false, _handle: TCP { _externalStream: {}, fd: 16, reading: true, owner: [Circular], onread: [Function: onread], onconnection: null, writeQueueSize: 0 }, _parent: null, _host: null, _readableState: ReadableState { objectMode: false, highWaterMark: 16384, buffer: [], length: 0, pipes: null, pipesCount: 0, flowing: true, ended: false, endEmitted: false, reading: true, sync: false, needReadable: true, emittedReadable: false, readableListening: false, defaultEncoding: 'utf8', ranOut: false, awaitDrain: 0, readingMore: false, decoder: null, encoding: null, resumeScheduled: false }, readable: true, domain: null, _events: { end: [Object], finish: [Function: onSocketFinish], _socketEnd: [Function: onSocketEnd], error: [Object], close: [Object], data: [Function], timeout: [Object] }, _eventsCount: 7, _maxListeners: undefined, _writableState: WritableState { objectMode: false, highWaterMark: 16384, needDrain: false, ending: false, ended: false, finished: false, decodeStrings: false, defaultEncoding: 'utf8', length: 0, writing: false, corked: 0, sync: false, bufferProcessing: false, onwrite: [Function], writecb: null, writelen: 0, bufferedRequest: null, lastBufferedRequest: null, pendingcb: 1, prefinished: false, errorEmitted: false, bufferedRequestCount: 0, corkedRequestsFree: [Object] }, writable: true, allowHalfOpen: false, destroyed: false, bytesRead: 0, _bytesDispatched: 14, _sockname: null, _pendingData: null, _pendingEncoding: '', server: null, _server: null, _idleTimeout: -1, _idleNext: null, _idlePrev: null, _idleStart: 1244, read: [Function], _consuming: true, _peername: { address: '127.0.0.1', family: 'IPv4', port: 6380 } }, replyParser: JavascriptReplyParser { name: 'javascript', buffer: , offset: 0, bigStrSize: 0, chunksSize: 0, buffers: [], type: 0, protocolError: false, offsetCache: 0, handleReply: [Function], handleNumbers: [Function], returnError: [Function], returnFatalError: [Function], returnReply: [Function] } }

    opened by antoniodimariano 5
  • Queue name length and maximum message size

    Queue name length and maximum message size

    The queue name length limitation of 80 characters is a big issue for me. I create queues that are based on the value of a JWT token and so these exceed the 80 character limitation. Can this be overridden or just be made bigger?

    Also the maximum message size of 65536 can be too small in some cases. Can the maximum be overridden or increased?

    opened by kpturner 5
  • Hitting 'message too long' error. At what size does this occur?

    Hitting 'message too long' error. At what size does this occur?

    Hitting message too long as defined in https://github.com/smrchy/rsmq/blob/deb56ab82f6349b9578bc8d4e15286ca53fa058f/test/test.coffee (could't find other place it's defined in code)

    It this a redis imposed threshold? Do you happen to know the max size?

    question 
    opened by 0xgeert 5
  • Implmentation announcment. Golang!

    Implmentation announcment. Golang!

    Hi there,

    I have been using your tiny and excellent framework at work. To aide my understanding of it, I have implemented a package for the go language.

    • [x] tests
    • [x] API compatability. Almost there! I am just missing the ChangeMessageAttributes function.
    • [x] Worker interface. done!

    Here is the code https://github.com/ebuckley/rsmq

    thanks!

    opened by ebuckley 2
  • when to call quit()

    when to call quit()

    From the documentation: https://github.com/smrchy/rsmq#quitcallback

    Disconnect the redis client. This is only useful if you are using rsmq within a script and want node to be able to exit.

    Our environment is exhibiting redis connection leaks. We currently encapsulate RSMQ in a class. We instantiate that class using new in a local scope and perform some RSMQ interactions. The instantiated object passes out of scope and I assume is garbage collected, however the redis connection for RSMQ seems to hang around.

    The documentation seems to imply that quit() should never have to be used and garbage collection of the connection will happen automagically, but that isn't what we're seeing. Is the documentation wrong?

    Thoughts?

    opened by createthis 0
  • ReplyError: ERR unknown command 'time'

    ReplyError: ERR unknown command 'time'

    Whenever I try to create a queue I get the error ReplyError: ERR unknown command 'time'

    ReplyError: ERR unknown command 'time'
        at parseError (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:193:12)
        at parseType (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:303:14) {
      command: 'TIME',
      code: 'ERR'
    }
    ReplyError: ERR unknown command 'script'
        at parseError (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:193:12)
        at parseType (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:303:14) {
      command: 'SCRIPT',
      args: [
        'load',
        'local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1")\n' +
          '\t\t\tif #msg == 0 then\n' +
          '\t\t\t\treturn {}\n' +
          '\t\t\tend\n' +
          '\t\t\tredis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1)\n' +
          '\t\t\tlocal mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1])\n' +
          '\t\t\tlocal rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1)\n' +
          '\t\t\tlocal o = {msg[1], mbody, rc}\n' +
          '\t\t\tif rc==1 then\n' +
          '\t\t\t\ttable.insert(o, KEYS[2])\n' +
          '\t\t\telse\n' +
          '\t\t\t\tlocal fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr")\n' +
          '\t\t\t\ttable.insert(o, fr)\n' +
          '\t\t\tend\n' +
          '\t\t\tredis.call("ZREM", KEYS[1], msg[1])\n' +
          '\t\t\tredis.call("HDEL", KEYS[1] .. ":Q", msg[1], msg[1] .. ":rc", msg[1] .. ":fr")\n' +
          '\t\t\treturn o'
      ],
      code: 'ERR'
    }
    ReplyError: ERR unknown command 'script'
        at parseError (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:193:12)
        at parseType (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:303:14) {
      command: 'SCRIPT',
      args: [
        'load',
        'local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1")\n' +
          '\t\t\tif #msg == 0 then\n' +
          '\t\t\t\treturn {}\n' +
          '\t\t\tend\n' +
          '\t\t\tredis.call("ZADD", KEYS[1], KEYS[3], msg[1])\n' +
          '\t\t\tredis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1)\n' +
          '\t\t\tlocal mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1])\n' +
          '\t\t\tlocal rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1)\n' +
          '\t\t\tlocal o = {msg[1], mbody, rc}\n' +
          '\t\t\tif rc==1 then\n' +
          '\t\t\t\tredis.call("HSET", KEYS[1] .. ":Q", msg[1] .. ":fr", KEYS[2])\n' +
          '\t\t\t\ttable.insert(o, KEYS[2])\n' +
          '\t\t\telse\n' +
          '\t\t\t\tlocal fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr")\n' +
          '\t\t\t\ttable.insert(o, fr)\n' +
          '\t\t\tend\n' +
          '\t\t\treturn o'
      ],
      code: 'ERR'
    }
    ReplyError: ERR unknown command 'script'
        at parseError (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:193:12)
        at parseType (C:\Users\User\Documents\Uche Ubani Dev\example-message-queue\node_modules\redis\node_modules\redis-parser\lib\parser.js:303:14) {
      command: 'SCRIPT',
      args: [
        'load',
        'local msg = redis.call("ZSCORE", KEYS[1], KEYS[2])\n' +
          '\t\t\tif not msg then\n' +
          '\t\t\t\treturn 0\n' +
          '\t\t\tend\n' +
          '\t\t\tredis.call("ZADD", KEYS[1], KEYS[3], KEYS[2])\n' +
          '\t\t\treturn 1'
      ],
      code: 'ERR'
    }
    
    opened by ucheubani96 0
  • Add Async Python implementation

    Add Async Python implementation

    Hi! I've created a new Python implementation of RSMQ, using an async Redis library, which enables the entire package to be async as well.

    You can find it here: https://github.com/federicotdn/aiorsmq

    I've added some tests that run it against the main JavaScript RSMQ implementation. The interface of aiorsmq is almost the same as the one for RSMQ, with some changes made to make it more pythonic (and thus easier to use for Python users). Internally, it does exactly the same stuff that RSMQ does.

    Hopefully the current work done will be enough to add it to the alternative implementations list - let me know if I'm missing anything.

    Thanks!

    opened by federicotdn 0
  • Redis vulnerability

    Redis vulnerability

    Hi,

    Found a vulnerability issue on the redis package as a dependency package of rsmq (rsmq-promise > rsmq > redis) https://www.npmjs.com/advisories/1662

    Are there any plans to update this library?

    Thanks.

    opened by aaronnapay 0
Releases(v0.9.1)
Owner
Patrick Liess
Web and backend dev since 1995. Trying very hard to make things easy.
Patrick Liess
BullMQ - Premium Message Queue for NodeJS based on Redis

The fastest, most reliable, Redis-based distributed queue for Node. Carefully written for rock solid stability and atomicity. Read the documentation F

Taskforce.sh Inc. 3.1k Dec 30, 2022
A simple, fast, robust job/task queue for Node.js, backed by Redis.

A simple, fast, robust job/task queue for Node.js, backed by Redis. Simple: ~1000 LOC, and minimal dependencies. Fast: maximizes throughput by minimiz

Bee Queue 3.1k Jan 5, 2023
Kue is a priority job queue backed by redis, built for node.js.

Kue Kue is no longer maintained Please see e.g. Bull as an alternative. Thank you! Kue is a priority job queue backed by redis, built for node.js. PRO

Automattic 9.4k Dec 20, 2022
Redis-backed task queue engine with advanced task control and eventual consistency

idoit Redis-backed task queue engine with advanced task control and eventual consistency. Task grouping, chaining, iterators for huge ranges. Postpone

Nodeca 65 Dec 15, 2022
A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis.

Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, dis

Conveyor MQ 45 Dec 15, 2022
A general-purpose message and event queuing library for MongoDB

MongoMQ2 MongoMQ2 is a light-weight Node.js library that turns MongoDB collections into general-purpose message queues or event logs, without addition

Morris Brodersen 11 Dec 28, 2022
A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set

apiban-redis A simple Node.js APIBAN client for downloading banned IPs and inserting them into a redis set. Installation This utility can be run as a

jambonz 4 Apr 5, 2022
Premium Queue package for handling distributed jobs and messages in NodeJS.

The fastest, most reliable, Redis-based queue for Node. Carefully written for rock solid stability and atomicity. Sponsors · Features · UIs · Install

null 13.5k Dec 31, 2022
Better Queue for NodeJS

Better Queue - Powerful flow control Super simple to use Better Queue is designed to be simple to set up but still let you do complex things. Persiste

Diamond 415 Dec 17, 2022
Yet another concurrent priority task queue, yay!

YQueue Yet another concurrent priority task queue, yay! Install npm install yqueue Features Concurrency control Prioritized tasks Error handling for b

null 6 Apr 4, 2022
Opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects.

qewe qewe is an opinionated, type-safe, zero-dependency max/min priority queue for JavaScript and TypeScript projects. Installation Add qewe to your p

Jamie McElwain 2 Jan 10, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 6 Nov 22, 2022
A document based messaging queue for Mongo, DocumentDB, and others

DocMQ Messaging Queue for any document-friendly architectures (DocumentDB, Mongo, Postgres + JSONB, etc). Why Choose This DocMQ is a good choice if yo

Jakob Heuser 10 Dec 7, 2022
A client-friendly run queue

client-run-queue This package provides a RunQueue implementation for scheduling and managing async or time-consuming functions such that client-side i

Passfolio 4 Jul 5, 2022
Nodejs Background jobs using redis.

node-resque: The best background jobs in node. Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 an

Actionhero 1.2k Jan 3, 2023
egg.js(jwt) + mysql(sequelize) + redis + docker + docker-compose + nginx + vue + element-ui 全栈获取省市区数据(统计局数据)【工具】项目,实现在docker环境中一键部署

Egg-spider Preview 线上预览地址 (https://ronaldoxzb.com/) admin admin Project description [后端]egg.js(jwt) + mysql(sequelize) + redis + docker + docker-compo

null 11 Sep 29, 2022
Serverless URL Shortener made with Next.js + Redis.

linki: a place for your links linki is a url shortener made with next.js and redis! built with simplicity in mind, it's all in one page. deploy your o

Jack Reiker 12 Sep 15, 2022
Serve read-only Redis data over a HTTP API with auth

Redis data exposer This was created for Cliptok and not intended for use outside of it. Use at your own peril. This application will serve an API that

Erisa A 10 May 28, 2022
This project demonstrates how you can use the multi-model capabilities of Redis to create a real-time stock watchlist application.

Introduction This project demonstrates how you can use Redis Stack to create a real-time stock watchlist application. It uses several different featur

Redis Developer 43 Jan 2, 2023