Build Amazon Simple Queue Service (SQS) based applications without the boilerplate

Overview

sqs-consumer

NPM downloads Build Status Code Climate Test Coverage

Build SQS-based applications without the boilerplate. Just define an async function that handles the SQS message processing.

Installation

npm install sqs-consumer --save

Usage

const { Consumer } = require('sqs-consumer');

const app = Consumer.create({
  queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
  handleMessage: async (message) => {
    // do some work with `message`
  }
});

app.on('error', (err) => {
  console.error(err.message);
});

app.on('processing_error', (err) => {
  console.error(err.message);
});

app.start();
  • The queue is polled continuously for messages using long polling.
  • Messages are deleted from the queue once the handler function has completed successfully.
  • Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
  • By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the batchSize option detailed below.
  • By default, the default Node.js HTTP/HTTPS SQS agent creates a new TCP connection for every new request (AWS SQS documentation). To avoid the cost of establishing a new connection, you can reuse an existing connection by passing a new SQS instance with keepAlive: true.
const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');

const app = Consumer.create({
  queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
  handleMessage: async (message) => {
    // do some work with `message`
  },
  sqs: new AWS.SQS({
    httpOptions: {
      agent: new https.Agent({
        keepAlive: true
      })
    }
  })
});

app.on('error', (err) => {
  console.error(err.message);
});

app.on('processing_error', (err) => {
  console.error(err.message);
});

app.start();

Credentials

By default the consumer will look for AWS credentials in the places specified by the AWS SDK. The simplest option is to export your credentials as environment variables:

export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...

If you need to specify your credentials manually, you can use a pre-configured instance of the AWS SQS client:

const { Consumer } = require('sqs-consumer');
const AWS = require('aws-sdk');

AWS.config.update({
  region: 'eu-west-1',
  accessKeyId: '...',
  secretAccessKey: '...'
});

const app = Consumer.create({
  queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
  handleMessage: async (message) => {
    // ...
  },
  sqs: new AWS.SQS()
});

app.on('error', (err) => {
  console.error(err.message);
});

app.on('processing_error', (err) => {
  console.error(err.message);
});

app.on('timeout_error', (err) => {
 console.error(err.message);
});

app.start();

API

Consumer.create(options)

Creates a new SQS consumer.

Options

  • queueUrl - String - The SQS queue URL
  • region - String - The AWS region (default eu-west-1)
  • handleMessage - Function - An async function (or function that returns a Promise) to be called whenever a message is received. Receives an SQS message object as it's first argument.
  • handleMessageBatch - Function - An async function (or function that returns a Promise) to be called whenever a batch of messages is received. Similar to handleMessage but will receive the list of messages, not each message individually. If both are set, handleMessageBatch overrides handleMessage.
  • handleMessageTimeout - Number - Time in ms to wait for handleMessage to process a message before timing out. Emits timeout_error on timeout. By default, if handleMessage times out, the unprocessed message returns to the end of the queue.
  • attributeNames - Array - List of queue attributes to retrieve (i.e. ['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']).
  • messageAttributeNames - Array - List of message attributes to retrieve (i.e. ['name', 'address']).
  • batchSize - Number - The number of messages to request from SQS when polling (default 1). This cannot be higher than the AWS limit of 10.
  • visibilityTimeout - Number - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
  • heartbeatInterval - Number - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding visibilityTimeout to the number of seconds since the start of the handler function. This value must less than visibilityTimeout.
  • terminateVisibilityTimeout - Boolean - If true, sets the message visibility timeout to 0 after a processing_error (defaults to false).
  • waitTimeSeconds - Number - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning.
  • authenticationErrorTimeout - Number - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to 10000).
  • pollingWaitTimeMs - Number - The duration (in milliseconds) to wait before repolling the queue (defaults to 0).
  • sqs - Object - An optional AWS SQS object to use if you need to configure the client manually

consumer.start()

Start polling the queue for messages.

consumer.stop()

Stop polling the queue for messages.

consumer.isRunning

Returns the current polling state of the consumer: true if it is actively polling, false if it is not.

Events

Each consumer is an EventEmitter and emits the following events:

Event Params Description
error err, [message] Fired when an error occurs interacting with the queue. If the error correlates to a message, that error is included in Params
processing_error err, message Fired when an error occurs processing the message.
timeout_error err, message Fired when handleMessageTimeout is supplied as an option and if handleMessage times out.
message_received message Fired when a message is received.
message_processed message Fired when a message is successfully processed and removed from the queue.
response_processed None Fired after one batch of items (up to batchSize) has been successfully processed.
stopped None Fired when the consumer finally stops its work.
empty None Fired when the queue is empty (All messages have been consumed).

AWS IAM Permissions

Consumer will receive and delete messages from the SQS queue. Ensure sqs:ReceiveMessage and sqs:DeleteMessage access is granted on the queue being consumed.

Contributing

See contributing guidelines.

Comments
  • SQS stopped polling.

    SQS stopped polling.

    Describe the bug The polling of SQS messages for various queues is working fine but the SQS stopped polling twice and all of the SQS messages started to add up in queue. I restarted the server and the queues started to be polled and process instantly. After a month same thing happened and SQS stopped polling and messages started to add up in queues, restarted the server and SQS began to poll and process queues. I've been using several SQS for a long time but this has happened twice in a span of 2 months on production server. Also no error was triggered on events using 'error' and 'processing_error'.

    Version of sqs-consumer is "^5.4.0". Version of aws-sdk": "^2.585.0

    I've seen a similar issue being reported previously but that was closed with a fix. https://github.com/bbc/sqs-consumer/issues/130 Any idea what could possibly be the reason ?

    To Reproduce

    Expected behaviour Do not expect SQS to stop polling. Also why did the server restart solved the issue and SQS started to poll again which was in a halt state.

    screenshots image

    Additional context I'm using node.js (v10.19.0) deployed on docker.

    bug 
    opened by mashoodrafi6 42
  • The consumer suddenly stops polling for new messages

    The consumer suddenly stops polling for new messages

    Hi,

    We're using sqs-consumer to poll our queue, but something strange happens; at times of activity, after a few minutes, the consumer just stops polling for new messages. If we restart our server, then it picks up the messages again.

    2018-10-06-033945_1413x850_scrot

    Here at 22:30 we restarted the server, and it started trying to receive again. Between 19:15 and 22:30 it did not pick any message from the queue. We're only specifying queueUrl - all other options are default. (Also there was no network outage or special event going on., this happens frequently)

    Our queue is configured like so :

    2018-10-06-014817_650x206_scrot

    We're logging errors :

     this._consumer.on("error", e => {
                logger.error(`Error with queue`, { queue: this._queueName, error: e.message });
     });
    

    But this never triggers. We've also tried logging events, but when we stop receiving the messages, we do not get a single event fired from sqs-consumer.

    Any ideas? Thanks!

    triage 
    opened by Esya 14
  • Update to AWS SDK v3

    Update to AWS SDK v3

    Description

    1. Update the peer and development dependencies to AWS SDK v3.
    2. Update all SQS client method calls to use updated syntax
    3. Update error types to match new error type thrown by client
    4. Update tests to run with the new API calls and error structure

    Motivation and Context

    1. This change updates the library to use the newest major version of the AWS SDK, which modularizes the entire SDK and substantially changes the API. As AWS pushed developers towards the new SDK, it will be important for widely-used libraries to update functionality
    2. Closes #251
    3. Mainly, this was selfishly motivated. I thought we needed this update for a project I'm working on. Then we pivoted and decided we didn't need it, but not before I had done nearly all the work to update the lib. I decided to finish my work and share the results in case they are useful to others. It's a small enough lib that if others needed this functionality locally, they could reasonably just copy/paste the changes into their project.

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [x] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
    • [x] I have read the CONTRIBUTING document.
    • [ ] I have added tests to cover my changes.
    • [x] All new and existing tests passed.

    Notes on the Checklist:

    1. I didn't really add tests because technically the behavior shouldn't change with this PR. However, I did substantial updates to the tests to make them work. The test coverage should not have changed at all, but I can add tests if desired.
    2. I know the contributing guidelines recommend waiting for issue acknowledgement before working on a PR. However, as indicated above, I thought I needed this work done for a project and by the time I realized I didn't need it, it was already nearly done. Please note: if you don't want to use these changes, I am 100% OK with that. I am just posting them here for the community if others find themselves wanting/needing these changes, or if the library maintainers want to use this as a starting place.

    Testing

    In addition to updating the unit tests, I tested basic functionality locally using this script. I

    // To test this against live resources, you must provision and AWS SQS queu first.
    // These commands assume you have the AWS CLI installed and configured
    //
    // Create a queue
    //    queue=$(aws sqs create-queue --queue-name my-queue --query "QueueUrl")
    // Send a message to the queue
    //    aws sqs send-message --queue-url $queue --message-body '{"user_id": "123123123"}'
    // Run the consumer app locally
    //    npx ts-node test-consumer.ts
    // Delete queue when testing is complete
    //    aws sqs delete-queue --queue-url $queue
    
    import { Consumer, Message } from './index'
    import { SQSClient } from '@aws-sdk/client-sqs'
    
    const ACCOUND_ID = '123456789'
    const QUEUE_NAME = 'my-queue'
    const REGION = 'us-west-2'
    const queueUrl = `https://sqs.${REGION}.amazonaws.com/${ACCOUND_ID}/${QUEUE_NAME}`
    
    const sqs = new SQSClient({ region: REGION, credentialDefaultProvider: () => () => Promise.resolve({ accessKeyId: 'accessKeyId', secretAccessKey: 'secretAccessKey', sessionToken: 'sessionToken' }) })
    
    export const handleMessage = async (message: Message) => {
      try {
        const parsed = JSON.parse(message.Body)
        console.log(parsed)
      } catch (e) {
        console.error('Parse failure!', e)
      }
    }
    
    const app = Consumer.create({
      queueUrl,
      handleMessage,
      sqs,
    })
    
    app.on('error', (err) => {
      console.error(err.message)
    })
    
    app.on('processing_error', (err) => {
      console.error(err.message)
    })
    
    app.on('timeout_error', (err) => {
      console.error(err.message)
    })
    
    app.start()
    
    opened by ericyd 13
  • Delete only certain message received via handleMessageBatch

    Delete only certain message received via handleMessageBatch

    Is it possible to only delete some of the messages received in a batch? I understand that using handleMessage in combination with a batchSize greater than 1 results in them being processed one at a time but handleMessageBatch seems to be all or nothing. For example, if I had an API that could save multiple messages at a time and return a list of those that were successfully saved, could I use that to delete only the successfully saved messages from the queue?

    triage feature-request 
    opened by TylerMatteo 13
  • Does it polls continously?

    Does it polls continously?

    Does the consumer continue to poll while processing incoming messages? I'm concerned whether it doesn't work sequentially and pausing the polling while handling messages as this can starve the server (if for example many messages wait on the queue and the server has capacity to handle many more messages than exist in the batch)

    opened by goldbergyoni 11
  • Ack partial of a batch of messages

    Ack partial of a batch of messages

    Description

    When providing a handleMessageBatch function, code currently allows ack all messages or throw an error which will reject all messages. The fix allows the user to return an array of successful messages which will be deleted from the queue, the rest of the messages will return to the queue

    Motivation and Context

    Added for personal use and response to issue 245 https://github.com/bbc/sqs-consumer/issues/245

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [X] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [X] My code follows the code style of this project.
    • [X] My change requires a change to the documentation.
    • [X] I have updated the documentation accordingly.
    • [X] I have read the CONTRIBUTING document.
    • [X] I have added tests to cover my changes.
    • [X] All new and existing tests passed.
    triage 
    opened by efiShtain 9
  • Multiple polls invoked

    Multiple polls invoked

    Based on a code from master branch:

    public start(): void {
        if (this.stopped) {
          debug('Starting consumer');
          this.stopped = false;
          this.poll();
        }
      }
    
      public stop(): void {
        debug('Stopping consumer');
        this.stopped = true;
      }
    
    private poll(): void {
        if (this.stopped) {
          this.emit('stopped');
          return;
        }
    
        /* tldr */
          }).then(() => {
            setTimeout(this.poll, currentPollingTimeout);
          }).catch((err) => {
            this.emit('error', err);
          });
      }
    

    If I would do something like:

    consumer.start();
    consumer.stop();
    consumer.start();
    

    Wouldn't this actually invoke two times:

    this.receiveMessage(receiveParams)
          .then(this.handleSqsResponse)
    

    Which would essentially lead toward double polling and quota usage? Shouldn't there be some usage of clearTimeout() method in stop() if polling is already scheduled via setTimeout() so that we interrupt poll() from being invoked?

    I was thinking to use this approach where I would stop a consumer when the queue is empty and in certain cases, if it's not running to start polling again, although I believe that this could be an edge-case and a bug.

    bug triage 
    opened by UMFsimke 9
  • app consuming more than one message at a time, in single-message (not batch) mode

    app consuming more than one message at a time, in single-message (not batch) mode

    Hi,

    My app seems to consume and process messages simultaneously, even though I am using handleMessage option not handleMessageBatch().

    It is really important to my application that messages are consumed one-at-a-time

    question 
    opened by upugo-dev 9
  • Possible memory leak in v5.0.0

    Possible memory leak in v5.0.0

    Description

    I just updated from 3.8.0 -> 5.0.0 and experienced ever increasing memory usage.

    screenshot 2019-02-20 at 21 04 17

    The update was reverted with release 034d565f with basically the following changes.

    ...
    + "sqs-consumer": "^3.8.0",
    - "sqs-consumer": "^5.0.0",
    ...
    
    +const Consumer = require('sqs-consumer')
    -const { Consumer } = require('sqs-consumer')
    ...
    
    +const handler = async (data, done) => {
    -const handler = async (data) => {
      try {
          await doStuff(data)
      } catch (error) {
        // log error
    +    return done(error)
    -    throw error
      }
    +  done()
    }
    

    Steps to reproduce

    Don't really have a good way to reproduce.

    Operating system

    Running on a Standard-1X dyno in Heroku, node 8.14.1

    opened by johanarnor 9
  • Workers stop processing messages

    Workers stop processing messages

    Would appreciate some help in figuring out this issue. I'm running 7 workers in parallel using pm2. After a few minutes, only one worker continues to process messages. A few minutes after that, no workers are receiving messages. Message Received and Message Processed are printed by all workers, then just a few workers, then one, and then none.

    Here are my settings and relevant code.

    Queue Settings

    Default Visibility Timeout: 90 seconds Message Retention Period: 4 days Maximum Message Size: 256kb Delivery Delay: 0s Receive Message Wait Time: 10s

    Code

    import 'babel-polyfill'
    
    /** Libraries */
    import SQSConsumer from 'sqs-consumer'
    import serializeError from 'serialize-error'
    
    /** Internal */
    import IndexingService from './services/indexing.service'
    import logger from './utilities/logger'
    import ravenClient from './utilities/raven'
    
    const indexingService = new IndexingService()
    
    const startProcess = async () => {
        const app = SQSConsumer.create({
            queueUrl: 'https://sqs.us-east-1.amazonaws.com/your-queue-here',
            waitTimeSeconds: 5,
            visibilityTimeout: 90,
            handleMessage: async (message, done) => {
                try {
                    const parsedBody = JSON.parse(message.Body)
                    await indexingService.startProcess(parsedBody)
    
                    done()
                }
                catch (e) {
                    logger.error({
                        'Error': serializeError(e)
                    })
    
                    if(process.env.NODE_ENV == 'production') {
                        //ravenClient.captureException(e)
                    }
    
                    done(e)
                }
            }
        })
    
        app.on('error', (e) => {
            logger.error({
                'Error': serializeError(e)
            })
    
            console.log(e);
        });
    
        app.on('processing_error', (e, message) => {
            logger.error({
                'Error': serializeError(e)
            })
    
            console.log(message);
        });
    
        app.on('stopped', () => {
            console.log('Worker Stopped.');
        });
    
        app.on('message_received', () => {
            console.log('Message Received.');
        });
    
        app.on('message_processed', () => {
            console.log('Message Processed.');
        });
    
        app.start()
    }
    
    startProcess()
    
    opened by seanlindo 9
  • Message receiving stops if done() is not invoked

    Message receiving stops if done() is not invoked

    From the documentation of sqs-consumer, I understood that a message from the queue would be deleted only if "done()" is called in handleMessage. I have a usecase that I want the message to be left in the queue after being processed. But when the run the following code, node exits after processing a single message and only starts polling only if I call done()

    Is this expected?

    var Consumer = require('sqs-consumer');
    var AWS = require('aws-sdk');
    AWS.config.loadFromPath('../config/aws-config.json');
    var app = Consumer.create({
      queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/myQ',
      waitTimeSeconds: 5,
      handleMessage: function (message, done) {
        console.log(message);
        //done();
      },
    });
    app.on('error', function(err) {
        console.log(err.message);
    });
    app.start();
    
    opened by kavuri 9
  • [Bug]: Not change visibility timeout error failed messages when process message batch

    [Bug]: Not change visibility timeout error failed messages when process message batch

    Describe the bug

    When processing the message batch, the processMessageBatch method does not change visibility timeout process failed messages (when terminateVisibilityTimeout is true)

    Your minimal, reproducible example

    ...

    Steps to reproduce

    For example, I consuming batch 10 messages with handleMessageBatch is:

    for (const message of messages) {
        if (message.Body) {
           await handle(JSON.parse(message.Body);
        }
    }
    

    Simulate it:

    - Message 1: Success
    - Message 2: Success
    - Message 3: Failed -> throw an error
    

    Current behavior: Not change the visibility timeout message from 3 to 10

    Expected behavior

    • Visibility timeout of message 3 to 10 should be changed to 0

    How often does this bug happen?

    None

    Screenshots or Videos

    No response

    Platform

    ...

    Package version

    v6.1.0

    AWS SDK version

    No response

    Additional context

    No response

    bug triage 
    opened by doonpy 0
  • Fix maintenance issues with sqs-consumer

    Fix maintenance issues with sqs-consumer

    Code Climate is currently reporting maintainability of sqs-consumer at the level of C, with some valid concerns.

    https://codeclimate.com/github/BBC/sqs-consumer

    Alongside what it mentions, the code is unstructured, functions are randomly placed it it's hard to see batch and normal functions alongside each other.

    We should do this before we take on further new features.

    bug 
    opened by nicholasgriffintn 1
  • feat: sqs-consumer should have the option to abort requests on stop

    feat: sqs-consumer should have the option to abort requests on stop

    Problem

    There have been a number of requests that sqs-consumer should abort any existing requests when .stop() is called, as in some implementations this is expected behaviour.

    Originally posted in https://github.com/bbc/sqs-consumer/issues/234 From @jwalton:

    When you call start(), it calls into poll() immediately and synchronously, so this.stopped will be false when we call into poll(). That test case validates that handleMessage was called once, and it should be called not at all.

    Even if the handler were not called again, there is other undesirable behavior from the current state of affairs. If you write a program that calls start() and then stop(), the program will take 20 seconds to run, even though it isn't doing anything, because it will wait for that open network connection to timeout. Ideally calling stop() should abort the current poll request, which would prevent us from receiving further messages, and would clean up the requests and let the process exit gracefully.

    And @rogerweb:

    In regard to the stop behavior, I have the same understanding as @jwalton and the same pain. I'm able to perform a graceful shutdown within a second for all my dependencies except by the SQS consumer as it is waiting the 20 seconds in the worst case scenario. The way I see it, the pending request to SQS is a sqs-consumer implementation concern, while the exposed functionality would be "please shutdown ASAP, I'm not expecting any new messages, no matter what you are doing internally". If an abort() fits this use case better, I would be very happy with it too.

    Solution

    In order to support existing implementations, sqs-consumer should provide new functionality that also aborts existing requests when .stop() is called.

    The suggested solution would be to add an option to the stop function that will allow users to tell us to abort any requests to AWS SQS, this would be implemented like so:

    consumer.stop({abort: true})

    To support backwards compatibility, this option would default to false.

    If the option is true, we should implement and use the AbortController to abort SQS requests as discussed here:

    https://aws.amazon.com/blogs/developer/abortcontroller-in-modular-aws-sdk-for-javascript/

    More documentation is also available here:

    https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/abortcontroller.html

    I don't know if we will use the aws-sdk for this though, as it would require an additional dependency that people would have to add. We can potentially use the Node implementation instead:

    https://nodejs.org/api/globals.html#globals_class_abortcontroller

    triage feature-request 
    opened by nicholasgriffintn 0
  • SQS Consumer possibly missing messages continuously

    SQS Consumer possibly missing messages continuously

    Question I didn't mark this as a bug report because I don't know if this is a bug or if I am using this package incorrectly. I have noticed a bug where sqs consumer possibly misses the messages in the queue. Around an year ago, it was perfectly fine and this started to happen before a few months.

    Here's my code Sending messages into SQS Queue:

    AWS.config.update({
      region: config.aws.sqs.region,
      accessKeyId: config.aws.sqs.access_key,
      secretAccessKey: config.aws.sqs.secret_key
    });
    
    var sqs = new AWS.SQS({ apiVersion: '2012-11-05' });
    
    module.exports.publishMessage = (txHash, eventLogs) => {
      return new Promise(async (resolve, reject) => {
        try {
    
          let sent_count = 0;
    
          logger.info(null, `Publishing events ${eventLogs.length}`)
    
          eventLogs.forEach((event) => {
            event.transaction_id = txHash;
            const param = {
              MessageBody: JSON.stringify(event),
              MessageGroupId: uuid.v4(),
              MessageDeduplicationId: uuid.v4(),
              QueueUrl: config.aws.sqs.node_consumer_queue_url
            }
            logger.info(null, `Sending events ${JSON.stringify(param)}`)
    
            sqs.sendMessage(param, function (err, data) {
              if (err) logger.error(null, err); // an error occurred
              else {
                logger.info(null, `Published: ${JSON.stringify(data)}`);
    
                // Debug purposes
                sent_count += 1;
                console.log(sent_count, "Sent count!");
                resolve();
              }
            });
          });
        } catch (e) {
          console.log('e', e);
          resolve();
        }
      })
    }
    
    

    Receiving messages from the queue

    AWS.config.update({
    	region: config.aws.sqs.region,
    	accessKeyId: config.aws.sqs.access_key,
    	secretAccessKey: config.aws.sqs.secret_key
    });
    
    let total_count = 0;
    
    // Create our consumer
    const app = Consumer.create({
    	queueUrl: queueUrl,
    	handleMessageBatch: async (messages) => {
    		total_count += messages.length;
    		return new Promise(async (resolve, reject) => {
    			async.each(messages, async function (message, callback) {
    				// console.log(message);
    				console.log(total_count, "Received count!");
    
    				resolve();
    			});
    			console.log("Execution complete!!");
    
    		});
    	},
    	sqs: new AWS.SQS({
    		httpOptions: {
    			agent: new https.Agent({
    				keepAlive: true
    			})
    		}
    	}),
    	batchSize: 10,
    	waitTimeSeconds: 5
    });
    
    app.on('error', (err) => {
    	logger.error(null, `Error in consumer: ${JSON.stringify(err)}`, err, err.message);
    });
    
    app.on('message_received', (msg) => {
    	logger.info(null, `SQS Message Received`)
    });
    

    Sending messages in queue is working correctly whereas receiving is the part where works incorrects. When I sent 20 messages, it received 12, upon sending 10 messages, it received 8. Something weird is going on.

    screenshots image

    Additional context Version: Latest

    help wanted question triage 
    opened by 0xrpj 2
  • Poller processing next message before previous message has finished

    Poller processing next message before previous message has finished

    Question

    I seem to be having an issue. Let's say that I have 5 messages in my queue and I start the poller. The first message is pulled and work begins on handling it. During processing, I have a shell command that executes that can take quite awhile to return and I need the poller to wait until the previous message is completed entirely before it moves on. Unfortunately, it appears that it is only waiting the pollingWaitTimeMs time and then it is processing the next message before the previous message is done. It is even executing the message_processed callback even though the shell command has not been completed.

    screenshots

    NA

    Additional context

    Basically, I am downloading an image from an object storage service like S3 (but not S3) and saving it locally to my server. Once saved, I am using that image to create map tiles using gdal2tiles.py however, this command can take sometime to process and finish. I need the poller to wait for this command to finish before it grabs the next message in the queue.

    Code Sample

    // Watcher Setup
    const watcher = Consumer.create({
      queueUrl: global.config.msgsrvc.url,
      region: 'us-east-1',
      pollingWaitTimeMs: 60000,
      messageAttributeNames: ['companyId', 'farmId', 'fieldId', 'flightId', 'pipelineId', 'type', 'uuid', 'bucket', 'path', 'filename'],
      handleMessage: async(message) => {
        console.log("Processing Message...")
    
        await Pipeline.update({
          status: 2,
          startedAt: db.sequelize.fn('NOW')
        }, {
          where: {
            id: message.MessageAttributes.pipelineId.StringValue,
            uuid: message.MessageAttributes.uuid.StringValue
          }
        }).then(async() => {
          console.log("Downloading image from Wasabi...")
    
          let directory = `./tmp/${message.MessageAttributes.uuid.StringValue}-${message.MessageAttributes.type.StringValue}`
    
          fs.mkdir(directory, async() => {
            await storage.doDownload(
              message.MessageAttributes.bucket.StringValue,
              message.MessageAttributes.path.StringValue,
              message.MessageAttributes.filename.StringValue,
              directory
            ).then(async() => {
              console.log("Creating tiles...")
    
              let dirTiles = `${directory}/tiles`
    
              fs.mkdir(dirTiles, () => {
                let command = `gdal2tiles.py -z 1-20 --processes=10 ${directory}/${message.MessageAttributes.filename.StringValue} ${dirTiles}`
    
                // cmd.get(command, (data) => {
                //   console.log("Tiles created successfully...")
                //   console.log("Uploading tiles to Wasabi...")
                // })
    
                exec(command, (error, stdout, stderr) => {
                  if (error) {
                    console.log(`Error creating tiles: ${error.message}`)
                  }
    
                  console.log("Tiles created successfully...")
                  console.log("Uploading tiles to Wasabi...")
                });
              })
            }).catch((error) => {
              console.log("Failed to download image: ", error)
            })
          })
        }).catch((error) => {
          console.log("Error: ", error)
        });
      },
      sqs: new aws.SQS({
        credentials: creds,
        apiVersion: '2012-11-05',
        region: 'us-east-1'
      })
    });
    
    watcher.on('message_received', (message) => {
      console.log("Message Received...")
    })
    
    watcher.on('message_processed', async(message) => {
      console.log("Message Processed...")
    })
    
    watcher.on('empty', () => {
      console.log("SQS Queue Empty...")
    })
    
    watcher.on('error', async(err, message) => {
      console.error("General Error:", err);
    });
    
    watcher.on('processing_error', async(err, message) => {
      console.error("Processing Error: ", err);
    });
    
    watcher.on('timeout_error', (err) => {
      console.error("Timeout Error: ", err);
    });
    
    console.log("Starting SQS Watcher")
    
    watcher.start();
    
    question triage 
    opened by recoiladam 4
Releases(v6.1.0)
  • v6.1.0(Dec 18, 2022)

    What's Changed

    Features

    • feat: add NonExistentQueue to isConnectionError by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/336
    • Ack partial of a batch of messages by @efiShtain in https://github.com/bbc/sqs-consumer/pull/255
    • chore: typescript enhancements by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/337

    Docs

    • chore: creating an example test app by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/331
    • chore: updating api documentation by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/334
    • chore: moving aws iam docs to below credentials by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/335

    New Contributors

    • @efiShtain made their first contribution in https://github.com/bbc/sqs-consumer/pull/255

    Full Changelog: https://github.com/bbc/sqs-consumer/compare/v6.0.2...v6.1.0

    Source code(tar.gz)
    Source code(zip)
  • v6.0.2(Dec 16, 2022)

    Fix: release had no build / prepublish command was deprecated

    Full Changelog: https://github.com/bbc/sqs-consumer/compare/v5.8.0...v6.0.2

    Source code(tar.gz)
    Source code(zip)
  • v6.0.0(Dec 16, 2022)

    What's Changed

    • Update to AWS SDK v3 by @ericyd in https://github.com/bbc/sqs-consumer/pull/252

    New Contributors

    • @ericyd made their first contribution in https://github.com/bbc/sqs-consumer/pull/252

    Full Changelog: https://github.com/bbc/sqs-consumer/compare/v5.8.0...v6.0.0

    Source code(tar.gz)
    Source code(zip)
  • v5.8.0(Dec 9, 2022)

    What's Changed

    • chore: updating files to make contributing easier and better by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/323
    • chore: updating the README build status badge by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/324
    • Document default waitTimeSeconds by @claaslange in https://github.com/bbc/sqs-consumer/pull/318
    • chore(deps): updating dependencies by @nicholasgriffintn in https://github.com/bbc/sqs-consumer/pull/329

    New Contributors

    • @nicholasgriffintn made their first contribution in https://github.com/bbc/sqs-consumer/pull/323
    • @claaslange made their first contribution in https://github.com/bbc/sqs-consumer/pull/318

    Full Changelog: https://github.com/bbc/sqs-consumer/compare/v5.7.0...v5.8.0

    Source code(tar.gz)
    Source code(zip)
  • v5.7.0(Apr 21, 2022)

    • Merge pull request #280 from rafael-pb/master 306f97a
    • Merge branch 'ghost-patch-1' 6107437
    • fix: allow ConsumerOptions of type number to be zero 8fed996
    • Updating AWS SDK to 2.1114.0 1cef3ca
    • fix: allow ConsumerOptions of type number to be zero 7350eb5
    • Merge pull request #286 from shlomo-artlist/patch-1 83f870b
    • Merge pull request #287 from env0/master 3414ada
    • Merge pull request #297 from bbc/dependabot/npm_and_yarn/ajv-6.12.6 3940807
    • Merge pull request #307 from bbc/dependabot/npm_and_yarn/ansi-regex-3.0.1 8a74500
    • Merge pull request #300 from RobPethick/patch-1 92ba7c3
    • Merge pull request #246 from guizoxxv/patch-1 7939147
    • Bump ansi-regex from 3.0.0 to 3.0.1 03ef47c
    • Bump ajv from 6.8.1 to 6.12.6 3026440
    • Merge pull request #268 from bbc/dependabot/npm_and_yarn/path-parse-1.0.7 b12a226
    • Merge pull request #274 from widdix/fix-iam-permissions f20b4d0
    • Merge pull request #282 from rafaelcunhaiFood/update_debug_version 6f823b8
    • Merge pull request #284 from bbc/dependabot/npm_and_yarn/aws-sdk-2.814.0 a5c6183
    • Merge pull request #296 from bbc/dependabot/npm_and_yarn/pathval-1.1.1 16ef7e4
    • Merge pull request #305 from bbc/dependabot/npm_and_yarn/minimist-1.2.6 07f6ab4
    • Bump minimist from 1.2.5 to 1.2.6 e18a6b2
    • Fix visability/visibility spelling error 86e72fc
    • Bump pathval from 1.1.0 to 1.1.1 4dedfe2
    • Don't delete messages from sqs (#1) 193efce
    • Update README.md 0ed5e54
    • Bump aws-sdk from 2.699.0 to 2.814.0 2a99537
    • Update debug version b8ee423
    • Issue #275 - fix visibility timeout error handler 143bdc6
    • Fix documented required IAM actions 223f18e
    • Bump path-parse from 1.0.6 to 1.0.7 53f98b0
    • Included https module on example 52a3bf6

    https://github.com/BBC/sqs-consumer/compare/v5.6.0...v5.7.0

    Source code(tar.gz)
    Source code(zip)
  • v5.6.0(Aug 26, 2021)

    • Update consumer.ts (#235) 663f222
    • Merge pull request #270 from charlescapps/charlescapps_issues_269_fix-visibility-timeout 7efbc26
    • Update unit tests with the correct expectations d17e194
    • Issue #269 - fix the input to ChangeVisibilityTimeout to not include the elapsed seconds 6a23fab
    • Fix link to contributing guidelines (#244) 3cafbb7

    https://github.com/BBC/sqs-consumer/compare/v5.5.0...v5.6.0

    Source code(tar.gz)
    Source code(zip)
  • v5.5.0(Nov 18, 2020)

  • v5.4.0(Sep 13, 2019)

  • v5.3.0(May 22, 2019)

  • v5.2.0(Mar 22, 2019)

  • v5.1.1(Mar 20, 2019)

  • v5.1.0(Mar 18, 2019)

  • v5.0.1(Feb 22, 2019)

  • v5.0.0(Feb 18, 2019)

  • v5.0.0-beta.0(Feb 13, 2019)

  • v4.1.0(Jan 30, 2019)

  • v4.0.0(Jan 10, 2019)

  • v3.8.0(Oct 31, 2017)

  • v3.7.0(Aug 22, 2017)

  • v3.6.1(Jun 15, 2017)

  • v3.6.0(Apr 20, 2017)

  • v3.5.0(Feb 22, 2017)

  • v3.4.0(Nov 18, 2016)

  • v3.3.0(Aug 22, 2016)

  • v3.2.0(Jun 28, 2016)

  • v3.1.3(May 12, 2016)

  • v3.1.2(Jan 26, 2016)

  • v3.1.1(Jan 26, 2016)

  • v3.1.0(Dec 30, 2015)

  • v3.0.0(Dec 3, 2015)

    Breaking change

    The error event now only returns errors that occur when receiving or deleting messages from an SQS queue.

    Errors that you return in the done callback are now emitted using the processing_error event. Use this if you'd like a convenient way to log all of your application errors.

    #20 (Thanks @pablovilas!)

    Source code(tar.gz)
    Source code(zip)
Owner
BBC
Open source code used on public facing services, internal services and educational resources.
BBC
LoopBack makes it easy to build modern API applications that require complex integrations.

LoopBack makes it easy to build modern applications that require complex integrations. Fast, small, powerful, extensible core Generate real APIs with

StrongLoop and IBM API Connect 4.4k Jan 4, 2023
Marble.js - functional reactive Node.js framework for building server-side applications, based on TypeScript and RxJS.

Functional reactive Node.js framework for building server-side applications, based on TypeScript and RxJS. Ecosystem Name Description @marblejs/core F

Marble.js 2.1k Dec 16, 2022
⛔️ DEPRECATED - Boilerplate for getting started with MERN stack

⛔️ DEPRECATED MERN is deprecated and is no longer actively maintained. mern-starter MERN is a scaffolding tool which makes it easy to build isomorphic

Hashnode 5.1k Jan 3, 2023
A progressive Node.js framework for building efficient, scalable, and enterprise-grade server-side applications on top of TypeScript & JavaScript (ES6, ES7, ES8) 🚀

A progressive Node.js framework for building efficient and scalable server-side applications. Description Nest is a framework for building efficient,

nestjs 53.2k Dec 31, 2022
A framework for real-time applications and REST APIs with JavaScript and TypeScript

A framework for real-time applications and REST APIs with JavaScript and TypeScript Feathers is a lightweight web-framework for creating real-time app

Feathers 14.3k Jan 1, 2023
Use full ES2015+ features to develop Node.js applications, Support TypeScript.

ThinkJS Use full ES2015+ features to develop Node.js applications, Support TypeScript. 简体中文文档 Installation npm install -g think-cli Create Application

ThinkJS 5.3k Dec 30, 2022
MVC framework making it easy to write realtime, collaborative applications that run in both Node.js and browsers

Derby The Derby MVC framework makes it easy to write realtime, collaborative applications that run in both Node.js and browsers. Derby includes a powe

DerbyJS 4.7k Dec 23, 2022
A well documented set of tools for building node web applications.

Perk Framework Perk is a well documented set of tools for building node web applications. The goal of Perk is first and foremost to provide a well doc

Aaron Larner 179 Oct 26, 2022
🥚 Born to build better enterprise frameworks and apps with Node.js & Koa

Features Built-in Process Management Plugin System Framework Customization Lots of plugins Quickstart Follow the commands listed below. $ mkdir showca

egg 18.3k Dec 29, 2022
📦🔐A lightweight private proxy registry build in Node.js

Version 6 (Development branch) Looking for Verdaccio 5? Check branch 5.x. Verdaccio is a simple, zero-config-required local private npm registry. No n

Verdaccio 14.3k Dec 31, 2022
🍔 A Node.js Serverless Framework for front-end/full-stack developers. Build the application for next decade. Works on AWS, Alibaba Cloud, Tencent Cloud and traditional VM/Container. Super easy integrate with React and Vue. 🌈

Midway - 一个面向未来的云端一体 Node.js 框架 English | 简体中文 ?? 欢迎观看 Midway Serverless 2.0 发布会回放: https://www.bilibili.com/video/BV17A411T7Md 《Midway Serverless 发布

Midway.js 6.3k Jan 8, 2023
Component based MVC web framework for nodejs targeting good code structures & modularity.

Component based MVC web framework for nodejs targeting good code structures & modularity. Why fortjs Based on Fort architecture. MVC Framework and fol

Ujjwal Gupta 47 Sep 27, 2022
Realtime.js - a fast frontend framework based on Web-Components.

Realtime.js is a fast frontend framework based on Web-Components and Proxies. It has a lot of features to simplify your way of live as a vanillajs developer. The framework is programmed in such a way, that you can edit it yourself if you need additional features.

Kilian Hertel 7 Nov 1, 2022
The Simple, Secure Framework Developers Trust

@hapi/hapi The Simple, Secure Framework Developers Trust Build powerful, scalable applications, with minimal overhead and full out-of-the-box function

hapi.js 14.1k Dec 31, 2022
A docker image to build a SQS queue listener. Written in TypeScript, made to use with docker.

sqs-request A docker image to build a SQS queue listener. Written in TypeScript, made to use with docker. SQS queue processor with node, ts-squiss, wh

Marcus Yoda 3 Jan 20, 2022
⚡️The Fullstack React Framework — built on Next.js

The Fullstack React Framework "Zero-API" Data Layer — Built on Next.js — Inspired by Ruby on Rails Read the Documentation “Zero-API” data layer lets y

⚡️Blitz 12.5k Jan 4, 2023
Simple scaffolding for applications that produce SQS messages

sqs-producer Enqueues messages onto a given SQS queue Installation npm install sqs-producer Usage const { Producer } = require('sqs-producer'); // c

BBC 160 Dec 23, 2022
NestJS + AWS SQS sample

nestjs-sqs-sample 概要 NestJS + AWS SQSのサンプルプロジェクトです。 localstackのSQSを使用して動作確認をします。 動作環境 Mac OS Node.js - 16.x yarn - 1.22.x Docker Desktop - 4.2.0以上 AWS

Yasuyuki Saito 8 Oct 18, 2022
awsrun 189 Jan 3, 2023
Sample solution to build a deployment pipeline for Amazon SageMaker.

Amazon SageMaker MLOps Build, Train and Deploy your own container using AWS CodePipeline and AWS CDK This’s a sample solution to build a deployment pi

AWS Samples 4 Aug 4, 2022