Provides a KafkaJS-compatible handler for processing messages that facilitates publishing to delayed-retry or dead-letter topics

Overview

kafkajs-async-retry

License: MIT Build Status

This module handles retries and dead-lettering for messages from a Kafka topic without blocking the processing of subsequent messages. It is intended to be used in conjunction with KafkaJS.

Getting Started

Prerequisites

Your project should have a dependency on KafkaJS, which is listed as a peer dependency of this module. This module has no other runtime dependencies.

Installation

npm install --save kafkajs-async-retry
# or using yarn
yarn add kafkajs-async-retry

Compatibility Requirements

  1. You must be using a version of Kafka that supports message headers (>= v0.11.0.0).
  2. You must be using a version of KafkaJS that provides a pause callback function to the eachMessage/eachBatch handler functions (>= 2.1.0)

Kafka Setup

Depending on how you want to retry failed messages, you must create topics to store the retried messages in addition to the primary topics that your application is already using.

  • Each consumer group must have a topic named ${consumerGroup}-dlq. This topic represents the dead-letter queue of messages that have failed after multiple retry attempts.
  • Each consumer group must have a series of topics to hold the messages to retry. The names of these topics depend on which retry topic naming strategy you use, which you set with the retryTopicNaming parameter.
    • If you set the retryTopicNaming parameter to RetryTopicNaming.ATTEMPT_BASED (the default), you must create a series of topics for each consumer group that are numbered according to the retry attempt. For example, if you want to retry failed messages three more times in addition to the initial attempt, create topics named ${consumerGroup}-retry-1, ${consumerGroup}-retry-2, and ${consumerGroup}-retry-3. You can configure the delay between retry attempts with the retryDelays parameter.
    • If you set the retryTopicNaming parameter to RetryTopicNaming.DELAY_BASED, you must create a series of topics for each consumer group that represent the amount of time to wait between retry attempts. For example, if you want to wait 5 seconds for the first two retry attempts and 60 seconds for the third, create topics named ${consumerGroup}-retry-5s and ${consumerGroup}-retry-60s. You can configure the total number of retry attempts with the maxRetries parameter.

For more information on configuring retries, see Retry Topic Naming Strategies.

Example Usage

To use this module, create an AsyncRetryHelper object with the parameters that specify the retry delay, retry strategy, and number of retries, as in this example:

import AsyncRetryHelper, { RetryTopicNaming } from "kafkajs-async-retry";

const asyncRetryHelper = new AsyncRetryHelper({
  producer,
  groupId: "test-group",
  retryTopicNaming: RetryTopicNaming.ATTEMPT_BASED,
  retryDelays: [5, 30, 60],
  maxRetries: 3,
});

You'll then need to ensure that your consumer is subscribed to the appropriate retry topics.

consumer.subscribe({
  topics: [asyncRetryHelper.retryTopicPattern],
  fromBeginning: true,
});

Finally, use the eachMessage() or eachBatch() helpers to process the messages. To indicate that the message failed to process and send it back to the retry topics, throw an exception.

await consumer.run({
  eachMessage: asyncRetryHelper.eachMessage(
    async ({ topic, originalTopic, message, previousAttempts }) => {
      if (previousAttempts > 0) {
        console.log(`Retrying message from topic ${originalTopic}`);
      }
      // do something with the message (exceptions will be caught and the
      // message will be sent to the appropriate retry or dead-letter topic)
      processMessage(message);
    }
  ),
});

Here's a complete example:

const { Kafka } = require("kafkajs");
const AsyncRetryHelper,
  { RetryTopicNaming } = require("kafkajs-async-retry");

const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["kafka1:9092", "kafka2:9092"],
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: "test-group" });

const asyncRetryHelper = new AsyncRetryHelper({
  producer,
  groupId: "test-group",
  retryTopicNaming: RetryTopicNaming.ATTEMPT_BASED,
  retryDelays: [5, 30, 60],
  maxRetries: 3,
});

// set up the consumer
await consumer.connect();
await consumer.subscribe({ topics: ["test-topic"], fromBeginning: true });

// using a pattern here instead of an explicit list of topics will ensure
// messages from all retry topics are picked up for processing (even for
// retry topics that are no longer in use which may be the case if retry
// times or retry strategies are changed)
await consumer.subscribe({
  topics: [asyncRetryHelper.retryTopicPattern],
  fromBeginning: true,
});

// if you'd rather subscribe to only retry topics that are currently in
// use, you can use the following:
await consumer.subscribe({
  topics: asyncRetryHelper.retryTopics,
  fromBeginning: true,
});

// consume messages one at a time
await consumer.run({
  eachMessage: asyncRetryHelper.eachMessage(
    async ({ topic, originalTopic, message, previousAttempts }) => {
      if (previousAttempts > 0) {
        console.log(`Retrying message from topic ${originalTopic}`);
      }
      // do something with the message (exceptions will be caught and the
      // message will be sent to the appropriate retry or dead-letter topic)
      processMessage(message);
    }
  ),
});

// or, consume messages as a batch (more advanced, requires more
// implementation on your part)
await consumer.run({
  eachBatch: asyncRetryHelper.eachBatch(
    async ({ messageFailureHandler, asyncRetryMessageDetails, ...payload }) => {
      payload.batch.messages.forEach((message) => {
        if (asyncRetryMessageDetails.previousAttempts > 0) {
          console.log(
            `Retrying message from topic ${asyncRetryMessageDetails.originalTopic}`
          );
        }
        try {
          // do something with message
        } catch (err) {
          messageFailureHandler(err, message);
        }
      });
    }
  ),
});

// you can register event handlers for a couple relevant events if
// you'd like to take any action when messages are being queued for retry
// or dead-lettering. These events are fired _after_ the message has been sent
// to the relevant topic.
asyncRetryHelper.on("retry", ({ message, error }) => {
  // log info about message/error here
});
asyncRetryHelper.on("dead-letter", ({ message, error }) => {
  // log info about message/error here
});

Overview and Methodology

The approach used here has been explained in detail in various places including a helpful blog post from Uber Engineering. To summarize: when a message cannot be processed successfully, it is published to a different "retry" topic. The current offset on the original topic/partition is advanced as if the message was successfully processed so other messages can be processed without delay. Messages from the "retry" topics are then processed after a configurable delay. If a message continues to fail beyond a configurable maximum number of attempts, the message is published to a "dead letter" topic for manual inspection. Here's a sequence diagram demonstrating the process:

sequenceDiagram
  autonumber
  participant topicA as [Kafka]<br>Topic A
  participant cg as ConsumerGroupX
  participant retry as [Kafka]<br>ConsumerGroupX-Retry-N
  participant dead as [Kafka]<br>ConsumerGroupX-DeadLetter

  activate cg
  loop
    topicA->>cg: fetch batch of messages
    retry->>cg: fetch batch of messages
  end
  loop each message
    opt From Retry Queue?
      note over cg: Pause consumption until message<br>is ready to process
    end
    opt Error During Processing?
      alt Retry?
        cg->>retry: 
      else Fatal Error or Max Retries Reached?
        cg->>dead: 
      end
    end
    cg->>topicA: commit offset
    cg->>retry: commit offset
  end
  deactivate cg

Retry Topic Naming Strategies

The naming strategy that you choose determines the topics that you must create in Kafka.

Topics based on the number of attempts

By default, the kafkajs-async-retry module will publish a failed message to a retry topic based on the number of previous attempts.

For example, if you want your consumer group to retry messages three times, you need three topics (in addition to the primary topics and the dead-letter topic mentioned above): ${consumerGroup}-retry-1, ${consumerGroup}-retry-2, and ${consumerGroup}-retry-3.

This retry strategy has these advantages:

  • You can set the retry delay dynamically in the code without having to create any new topics. However, if you change the retry delay, existing messages in the retry topic keep the delay time that was in effect when they were published to the retry topic.
  • Based on the number of messages in each topic, you can see how many retries are required before messages are processed successfully. If you are monitoring for offset lag on these retry topics, you must take the configured delay into account since each message being retried waits for a certain amount of time before the retry consumer picks it up.

A downside of this strategy is that if you want to increase the number of retries, you must create additional topics.

To use this retry strategy, use the RetryTopicNaming.ATTEMPT_BASED retryTopicNaming configuration in the AsyncRetryHelper constructor.

Here's a diagram of the message flow using this retry strategy. This example uses a maximum of three retries with configured wait times of 5 seconds on the first two retries and a 60-second delay on the final attempt:

sequenceDiagram
  autonumber
    participant topicA as [Kafka]<br>Topic A
    participant cg as ConsumerGroupX
    participant retry1 as [Kafka]<br>ConsumerGroupX-Retry-1
    participant retry2 as [Kafka]<br>ConsumerGroupX-Retry-2
    participant retry3 as [Kafka]<br>ConsumerGroupX-Retry-3
    participant dead as [Kafka]<br>ConsumerGroupX-DeadLetter

    topicA->>+cg: 
    note over cg: Error processing message<br>(initial attempt)
    cg-)retry1: 
    note over retry1: delay 5 seconds
    retry1 ->> cg: 
    note over cg: Error processing message<br>(retry #35;1)
    cg -) retry2: 
    note over retry2: delay 5 seconds
    retry2 ->> cg: 
    note over cg: Error processing message<br>(retry #35;2)
    cg -) retry3: 
    note over retry3: delay 60 seconds
    retry3 ->> cg: 
    note over cg: Error processing essage<br>(retry #35;3)
    cg -X dead: Send to dead-letter topic

Topics based on the configured delay

Another naming strategy is to name retry topics based on the number of seconds to delay between attempts. This module puts the messages into different topics depending on how long to wait before retrying them.

For example, if you want to retry messages every 5 seconds, you can create a topic named ${consumerGroup}-retry-5s and send messages to that topic repeatedly until they succeed or reach the maximum number of attempts. If you have various delays based on which attempt is, you need a topic for each distinct configured delay. Note the s at the end of the topic name, which indicates that it is the number of seconds to wait and not the number of retries as used in the prior naming strategy.

The advantage of this strategy is that you can retry messages many times without having to create many topics, as long as the length of the delay stays the same. If you want to always wait the same amount of time between each attempt, you need only one retry topic and the dead letter topic.

However, if you want to change the length of the delays, you must create additional topics and ensure that any remaining messages from the existing topics are processed. This strategy also prevents you from easily judging how many messages are reaching different levels of retry, because you can't tell how many times a message has been tried based on which topic it's in.

To use this retry strategy, use the RetryTopicNaming.DELAY_BASED retryTopicNaming configuration in the AsyncRetryHelper constructor.

Here's a sequence diagram of the various topics needing when using a DELAY_BASED naming strategy with the retryDelays parameter set to [5, 5, 60]:

sequenceDiagram
  autonumber
    participant topicA as [Kafka]<br>Topic A
    participant cg as ConsumerGroupX
    participant retry5s as [Kafka]<br>ConsumerGroupX-Retry-5s
    participant retry60s as [Kafka]<br>ConsumerGroupX-Retry-60s
    participant dead as [Kafka]<br>ConsumerGroupX-DeadLetter

    topicA->>+cg: 
    note over cg: Error processing message<br>(initial attempt)
    cg-)retry5s: 
    note over retry5s: delay 5 seconds
    retry5s ->> cg: 
    note over cg: Error processing message<br>(retry #35;1)
    cg -) retry5s: 
    note over retry5s: delay 5 seconds
    retry5s ->> cg: 
    note over cg: Error processing message<br>(retry #35;2)
    cg -) retry60s: 
    note over retry60s: delay 60 seconds
    retry60s ->> cg: 
    note over cg: Error processing essage<br>(retry #35;3)
    cg -X dead: Send to dead-letter topic

API

AsyncRetryHelper (constructor)

This is the default export and is the primary entry point for all functionality provided by this module. It accepts a single argument, an object with the following properties:

Property Type Description
groupId string Required. The consumer group id that this helper is facilitating retries for.
producer kafkajs.Producer Required. The producer instance to use when publishing messages to retry topics. The producer should already be connected to the Kafka cluster.
maxRetries number Default: 5. The maximum number of retries for a given message
maxWaitTime number Default: 3000. The maximum amount of time (in milliseconds) that an eachMessage handler will wait before proceeding if the retry message is not yet ready to be processed (due to the configured delay). If the message is not going to be ready within this number of milliseconds, consumption from that topic/partition is paused. Note: this feature is not used at all when processing messages using the eachBatch callback handler. This value should be less than your sessionTimeout consumer configuration to avoid session timeouts.
retryDelays number[] Default: [5]. The number of seconds to wait before attempting another retry. If the number of retries exceeds the configured number of retry delays, the final retry delay value is used for each remaining retry attempt.
retryTopicNaming RetryTopicNaming Default: RetryTopicNaming.ATTEMPT_BASED. The strategy to use when naming retry topics (as discussed above).

AsyncRetryHelper#eachMessage

This method provides a simple wrapper around the standard eachMessage handler function that you would typically pass to KafkaJS's Consumer#run method. It extends the data provided to the handler function with the following properties:

Property Type Description
isRetry boolean Indicates whether this attempt to process the message is a retry or the initial attempt.
originalTopic string The topic that the message was originally published to. In the case of a message that is being attempted for the first time, that will match the standard topic property that KafkaJS passes through.
previousAttempts number The number of previous attempts that have been made for this message. On the first attempt (before any retries) the value is 0.
processTime Date The earliest time the message should be processed (should always be in the past since the AsyncRetryHelper handles pausing processing if messages are not ready)

If your message handler throws an exception, the wrapper catches the exception and sends the message to the appropriate retry topic or the dead-letter queue, if the maximum number of attempts has been exceeded.

To send a message directly to the dead letter topic and avoid any subsequent retries, throw a DeadLetter exception, which is exported by this module.

AsyncRetryHelper#eachBatch

Very similar to the above, this method provides a wrapper around the standard eachBatch handler function that you would typically pass to KafkaJS's Consumer#run method. It extends the data provided to the handler function with the following callback methods:

Function Name Arguments Description
asyncRetryMessageDetails kafkajs.KafkaMessage Returns an AsyncRetryMessageDetails object with properties as described above under the eachMessage documentation
messageFailureHandler Error, kafkajs.KafkaMessage Handles publishing the message to the appropriate retry (or dead-letter) topic; this method is asynchronous and should be await'd

To send a message directly to the dead letter topic and avoid any subsequent retries, provide a DeadLetter exception to the messageFailureHandler callback function.

🚨 Important 🚨

Like the KafkaJS docs mention, using eachBatch directly is considered a "more advanced use case" and it is recommended that you use the eachMessage approach unless there is a specific reason that mode of processing is not workable. While the implementation inside this module is a lot more complex for the eachBatch processing model than eachMessage, the primary difference exposed to the consumer of this functionality is that instead of simply allowing exceptions to bubble up to the eachMessage wrapper function, the messageFailureHandler callback function must be used. Also, when processing retries of a message using eachBatch, some of the batch metadata provided to the eachBatch handler function will almost certainly be incorrect since the batch of messages from a retry topic may be split since some messages may not ready to be retried quite yet. When that happens, no attempt is made to keep the various attributes of the batch object (i.e. offsetLag(), offsetLagLow(), firstOffset() and lastOffset()), in sync with the actual batch of messages that are being passed to your eachBatch handler.

AsyncRetryHelper#on

This class does emit two (in-process) events that may be useful for visibility into your message retry processing: AsyncRetryEvent.RETRY and AsyncRetryEvent.DEAD_LETTER. In both cases, the event payload consists of a single object with the following properties:

Property Type Description
message KafkaMessage The same value as what is provided to the eachMessage or eachBatch callback
details AsyncRetryMessageDetails Provides information as described above under the [eachMessage(#asyncretryhelpereachmessage) documentation
error Error (or sub-class) The most recent error object that is resulting in the message being retried or dead-letter'd
topic string The destination topic for the current retry attempt or the dead-letter queue as applicable

These events are emitted after the message has been published to the retry or dead-letter topic. Keep in mind that, per the EventEmitter docs, event handlers will be processed synchronously in the order they are registered.

Other Exports

AsyncRetryEvents (enum)

Provides the names of the different events that are emitted by the AsyncRetryHelper class as described above.

DeadLetter (sub-class of Error)

This exception is used to indicate that a message should be "dead-lettered" (i.e. not retried). If you throw this exception in an eachMessage handler or pass it to the messageFailureHandler callback (when using eachBatch processing mode), the message will be delivered to the dead-letter topic. The DeadLetter constructor takes a single argument, a string that indicates the reason the message is being dead-lettered.

RetryTopicNaming (enum)

Provides two enum values (as described in Retry Topic Naming Strategies) that can be provided to the AsyncRetryHelper constructor to configure the naming strategy used for retry topics.

You might also like...

List of awesome people offering their time for free to have a "coffee chat" with others about different topics, mostly in a mentorship kind of way.

Coffee Chat List of awesome people offering their time for free to have a "coffee chat" with others about different topics, mostly in a mentorship kin

Dec 12, 2022

A letter learning game I built for my son in React.js.

Evvie's Letter Game This project was bootstrapped with Create React App. Learning Letters / Evvie's Letter Game Thanks for checking out Evvie's letter

Jan 18, 2022

Uma aplicação pomodoro conectada com a API do Spotify, para tornar o seu estudo mais dinâmico. Topics Stars

Uma aplicação pomodoro conectada com a API do Spotify, para tornar o seu estudo mais dinâmico.  Topics Stars

Demo | Tecnologias | Iniciando | Projeto | Layout | Demo 🧪 Tecnologias Esse projeto foi feito com as seguintes tecnologias. NextJS Next Auth TypeScri

Aug 9, 2022

Course material for a ~10 hours introductionary course for Julia. Topics: Introduction, Parallel Programming, Data Science

Development We use Franklin.jl to generate the lecture material. To do so, simply activate the environment, use Franklin and run the local server: act

Dec 15, 2022

A chat logs online saver for discord bots to save messages history & cleared messages online

A chat logs online saver for discord bots to save messages history & cleared messages online

Chat Logs NPM package that saves messages online to view it later Useful for bots where users can save messages history & cleared messages online Supp

Dec 28, 2022

Colorconsole provides an interesting way to display colored info, success, warning and error messages on the developer console in your browser

Colorconsole provides an interesting way to display colored info, success, warning and error messages on the developer console in your browser

ColorConsole NPM Package Colorconsole provides an interesting way to display colored info, success, warning and error messages on the developer consol

Sep 19, 2022

GraphErr is an open-source error handling library for GraphQL implementations in Deno. It's a lightweight solution that provides developers with descriptive error messages, reducing ambiguity and improving debugging.

GraphErr is an open-source error handling library for GraphQL implementations in Deno. It's a lightweight solution that provides developers with descriptive error messages, reducing ambiguity and improving debugging.

GraphErr Descriptive GraphQL error handling for Deno/Oak servers. Features Provides additional context to GraphQL's native error messaging for faster

Nov 1, 2022

Export Diagrams.net diagrams during Dendron site publishing

Export Diagrams.net diagrams during Dendron site publishing Node module for exporting Diagrams.net (Draw.io) diagrams embedded in a published Dendron

Aug 18, 2022
Releases(v1.0.0)
Owner
Shutterstock
Shutterstock
Space Travelers' Hub - a web application that facilitates booking rockets and join selected space missions

This is a web application that facilitates booking rockets and join selected space missions. It is built for a company that offers both commercial and scientific space travel services. The application also works with real live data from the SpaceX API.

Mong'are 6 Mar 29, 2022
DiscordJs13-handler is the solution for all the discord.js developper who want a complete handler for slash commands

DiscordJs13-handler is the solution for all the discord.js developper who want a complete handler for slash commands

null 9 Jul 2, 2022
A plugin for Strapi that provides the ability to easily schedule publishing and unpublishing of any content type

strapi-plugin-publisher A plugin for Strapi that provides the ability to easily schedule publishing and unpublishing of any content type. Requirements

daedalus 19 Dec 7, 2022
Grupprojekt för kurserna 'Javascript med Ramverk' och 'Agil Utveckling'

JavaScript-med-Ramverk-Laboration-3 Grupprojektet för kurserna Javascript med Ramverk och Agil Utveckling. Utvecklingsguide För information om hur utv

Svante Jonsson IT-Högskolan 3 May 18, 2022
Hemsida för personer i Sverige som kan och vill erbjuda boende till människor på flykt

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

null 4 May 3, 2022
Kurs-repo för kursen Webbserver och Databaser

Webbserver och databaser This repository is meant for CME students to access exercises and codealongs that happen throughout the course. I hope you wi

null 14 Jan 3, 2023
jQuery Plugin For Delayed Event Execution

bindWithDelay jQuery plugin Author: Brian Grinstead MIT license: http://www.opensource.org/licenses/mit-license.php http://github.com/bgrins/bindWith

Brian Grinstead 152 Dec 31, 2022
[Trybe] Project Mistery Letter

Bem vindo ao Mystery Letter Olá esse e mais um projeto feito por mim do curso da Trybe. Esse projeto e um gerador de texto que, ao colocar o texto na

Luiz Wanderson Dev 5 Oct 7, 2022
Use this app to track and prioritize the topics you need to study during your job search

Interview Skills Grid Use this app to track and prioritize the topics you need to study during your job search. Try it for yourself! --> Interview Ski

R Boyle Software 6 Apr 13, 2022
A little animation for a big menu where the letters of a word shuffle to become the first letter of each menu item.

Letter Shuffle Animation for a Menu A little animation for a big menu where the letters of a word shuffle to become the first letter of each menu item

Codrops 29 Dec 4, 2022