Util for kafkajs to buffer messages and send them in batches, inspired by node-rdkafka

Overview

kafkjajs-buffer

Plugin for kafkajs to buffer messages and send them in batches, inspired by node-rdkafka

Overview

kafkajs-buffer adds queue/buffer capabilities to a kafkajs producer to buffer the messages before sending. It splits the buffer in batches and sends the messages to Kafka, optmizing the number of requests and hidding all this complexity. Delivered messages will be notified in a callback function avoiding the need of awaiting and improving streaming times.

Usage

You can install the kafkajs-buffer module like any other module:

npm install kafkajs-buffer

To use the module, you must require and instance it.

import { KafkajsBuffer } from "kafkajs-buffer";
const producerBuffer = new KafkajsBuffer(producer, options);

To send the messages push them in the buffer, similar the way you would send it using Kafkajs.

  producerBuffer.push({
    "topic-1",
    messages: [
      {
        key: "m1",
        value: "message 1",
      },
      {
        key: "m2",
        value: "message 2",
      },
    ],
  });

You can also push messages for different topics.

producerBuffer.push([
  {
    topic: "topic-1",
    messages: [
      {
        key: "m1",
        value: "message 1",
      },
    ],
  },
  {
    topic: "topic-2",
    messages: [
      {
        key: "m2",
        value: "message 2",
      },
      {
        key: "m3",
        value: "message 3",
      },
    ],
  },
]);

You can programatically request to send the buffer messages to kafka. This avoids reaching the max buffer size. Depending on the time from the last sending, the messages in the buffer queue will be sent immediately or postponed.

producerBuffer.poll();

In addition you can set the producer to poll on an interval.

producerBuffer.startAutoPolling(100);

Don't forget stop the autopolling before your program execution ends.

producerBuffer.stopAutoPolling();

To receive the confirmation when the messages are published to kafka use the callback functions 'onBatchDelivered' and/or 'onMessageDelivered'.

// This function is called everytime a message is successfully sent to Kafka
const onMessageDelivered = (messageDelivered) => {
  messagesDeliveredCount += 1;
};
// This function is called everytime a batch is successfully sent to Kafka
const onBatchDelivered = (messagesDelivered: IDeliveredMessage[]) => {
  messagesDeliveredCount += messagesDelivered.length;
};

In addition you can add extra information to the messages that won't be sent to kafka but will be received in the callback function.

type Info = {
  timestamp: number;
};

const producerBuffer = new KafkajsBuffer<Info>(producer, options);

const messageToSend: IMessageWithInfo<Info> = {
  key: "1",
  value: "message value",
  info: {
    timestamp: Date.now(),
  },
};

const onMessageDelivered = (messageDelivered: IDeliveredMessage<Info>) => {
  console.log(
    `Message created at ${messageDelivered.info?.timestamp} was delivered to kafka`
  );
};

To greacefully shutdonwn your process you must call and await 'flush'. It will wait until a possible onprogress sending ends and/or will send any pending message still in the buffer.

await producerBuffer.flush();

Configuration

const options = {
  batchNumMessages: 1000, // The buffer is sent to kafka splitted in batches of this size.
  queueBufferingMaxMs: 1000, // Time the messages are buffered before sending. Polling actions will be trigger the sending after this time.
  queueBufferingMaxMessages: 100000, // Max number of messages allowed in the buffer. When more messages are pushed it will throw an error.
  onMessageDelivered: () => {}, // Callback confirmation when a message is delivered to kafka.
  onBatchDeliverd: () => {}, // Callback confirmation when a batch is delivered to kafka.
  onSendError: (err) => {}, // Callback with error when the messages are tried to be sent after a poll and fail
  messageAcks: -1, // Control the number of required acks (https://kafka.js.org/docs/producing)
  responseTimeout: 30000, // The time to await a response in ms (https://kafka.js.org/docs/producing)
  messageCompression: CompressionTypes.None, // Compression codec (https://kafka.js.org/docs/producing)
};
You might also like...

A lightweight jQuery Ajax util library.

Just Wait Wait what? The server response. Just Wait is a lightweight jQuery utility that allows you to specify a function to be executed after a speci

Jun 22, 2022

Util for calling Prisma middleware for nested write operations.

Prisma Nested Middleware Util for calling Prisma middleware for nested write operations. Existing Prisma middleware is called once for every operation

Dec 7, 2022

IPLD transaction as CAR buffer [for use in databases]

car-transaction IPLD transaction as CAR buffer that can be used as a database transaction. Usage import Transaction from 'car-transaction' const run

Sep 17, 2022

This is a Webpack based to-do-list project. With this app, users can add thier daily routine tasks to the list, mark them as complet, edit them or delete them.

To Do List This is a Webpack based to-do-list project. With this app, users can add thier daily routine tasks to the list, mark them as complet, edit

Oct 30, 2022

A chat logs online saver for discord bots to save messages history & cleared messages online

A chat logs online saver for discord bots to save messages history & cleared messages online

Chat Logs NPM package that saves messages online to view it later Useful for bots where users can save messages history & cleared messages online Supp

Dec 28, 2022

Adapter application for consuming web3 messages from ie. wallets, and passing them on to starknet

🐍 StarknNet web3 account Development setup Clone deps with submodules git clone --recurse-submodules [email protected]:software-mansion-labs/starknet-we

Nov 21, 2022

The website which can help you to organize your daily or weekly activities and review them when you need them. you can add, remove and delete an activity

To Do list To do project is webpack project that list activities someone can do at a specific time In this TO-DO list, you can add or remove you activ

Jul 21, 2022

A web app which help you to save your daily tasks, mark them as completed and edit and delete them. Built with JavaScript

To-Do List A simple to-do list to help you organize your daily tasks. Built With HTML. CSS. JavaScript: ES6, Modules, Classes, Arrow Functions, Events

Dec 17, 2022

A web app which help you to save your daily tasks, mark them as completed and edit and delete them. Built with JavaScript

To-Do List A simple to-do list to help you organize your daily tasks. Built With HTML. CSS. JavaScript: ES6, Modules, Classes, Arrow Functions, Events

Aug 10, 2022
Owner
Alberto Juan
Alberto Juan
A template repo that contains a NodeJS app that will consume messages from a RabbitMQ queue and immediately send them to an Azure EventHub.

README.md Summary This repo (RabbitMQ to EventHub Shovel) is a template that contains a NodeJS app that will consume messages from a RabbitMQ queue an

Valtech San Diego 6 Jul 2, 2022
Send encrypted messages and decrypt them without sharing keys. Built using the Handshake blockchain.

zmsg Encrypt and decrypt messages using AEAD with an ephemeral key Learn more by joining the Handshake Discord Community I noticed that there wasn't a

Publius Federalist 31 Jul 27, 2022
A simple to do list webpage where you can log the daily tasks you have to do, mark them as checked, modify them, reorder them and remove them. Made using HTML, CSS and JavaScript.

To-Do-List This Webpage is for an app called To-Do-List which helps you add, remove or check tasks you have to do. It is a simple web page which conta

Zeeshan Haider 9 Mar 12, 2022
Grupprojekt för kurserna 'Javascript med Ramverk' och 'Agil Utveckling'

JavaScript-med-Ramverk-Laboration-3 Grupprojektet för kurserna Javascript med Ramverk och Agil Utveckling. Utvecklingsguide För information om hur utv

Svante Jonsson IT-Högskolan 3 May 18, 2022
Hemsida för personer i Sverige som kan och vill erbjuda boende till mÀnniskor pÄ flykt

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

null 4 May 3, 2022
Kurs-repo för kursen Webbserver och Databaser

Webbserver och databaser This repository is meant for CME students to access exercises and codealongs that happen throughout the course. I hope you wi

null 14 Jan 3, 2023
Send encrypted and decrypted messages with verifiable keys and human readable names.

zooko-msg Encrypt and decrypt messages using AES with a preshared ECDH key generated using keys associated with Handshake names. I noticed that there

Publius Federalist 31 Jul 27, 2022
A util for getting data and metadata for all markdown files in a given dir. Useful for building static site generators

extract-md-data A util for getting data and metadata for all markdown files in a given dir. Useful for building static site generators. Usage Given th

Claire Froelich 2 Jan 6, 2022
Send messages to this bot and almacenate it on selected Notion's Database

Telegram to Notion Bot What can do this bot? With this bot you can authorize that it receive the text that you send and store it on one selected datab

Francisco Pessano 24 Dec 11, 2022
JS/TS lightweight value-multimethod util

fp-multik ????‍♂ Small functional utility for control flow and conditional operator for functions. Multik is value-based multimethod for Javascript/T

Kalagin Ivan 5 Dec 25, 2022