Pulsar Flex is a modern Apache Pulsar client for Node.js, developed to be independent of C++.

Overview

PulsarFlex

Apache Pulsar® client for Node.js

Report Bug · Request Feature

About

PulsarFlex is a modern Apache Pulsar client for Node.js.

It was developed because the dependency in the official c++ external libraries does not fit some use cases.

Supports all os platforms that can run nodejs.

Features

  • Producer
    • Access Modes
      • Exclusive
      • Shared
    • Send types
      • Batch
      • Single Message
    • Message Properties
    • Reconnection built in
  • Subscriptions
    • Subscription types
      • Exclusive
      • Fail over
      • Shared
      • Key_Shared
    • Acks
      • Specific ack
      • Cumulative ack
      • Automatic ack
    • Reconnection built in
  • Authentication
    • JWT

Getting Started

npm install pulsar-flex

Usage

const { Producer, Consumer, logLevel } = require('pulsar-flex')

const producer = new Producer({
  topic: "persistent://public/default/my-topic",
  discoveryServers: ['pulsar-host:6650'],
  //If you dont provide any jwt token it will use no auth
  jwt: process.env.JWT_TOKEN,
  producerAccessMode: Producer.ACCESS_MODES.SHARED,
  logLevel: logLevel.INFO
  // you can also provide logCreator function
})

const consumer = new Consumer({
  topic: "persistent://public/default/my-topic",
  discoveryServers: ['pulsar-host:6650'],
  jwt: process.env.JWT_TOKEN,
  subType: Consumer.SUB_TYPES.EXCLUSIVE,
  consumerName: 'Consumer name',
  receiveQueueSize: 1000,
  logLevel: logLevel.INFO,
  // you can also provide logCreator function
})

const run = async () => {
  await producer.create();
  // you can also send single message using sendMessage function
  await producer.sendBatch([
    {
      properties: {pulsar: "flex"}, 
      payload: 'Ayeo' 
    },
    {
      properties: {pulsar: "flex"},
      payload: 'Ayeo'
    }
  ]);

  await consumer.subscribe();
  await consumer.run({
    onMessage: async ({ ack, message }) => {
      await ack(); // Default is specific ack
      // await ack({type: Consumer.ACK_TYPES.CUMULATIVE});
      console.log({
        message,
      })
    }, autoAck: false, // specify true in order to use automaticAck
  });
}

run().catch(console.error)

Contributing

We would love to get help from the community in order to accelerate and expose the latest features of pulsar.

License

MIT LICENSE

Comments
  • Consumer State Change Handling Feature

    Consumer State Change Handling Feature

    implements this #81

    Allow the users of this library to "listen" to change in the consumer state without needing to check at intervals.

    Since this PR isn't adding anything new to the consumer's logic, but just exports some of the inners nicely, I didn't add anything tests, open for suggestions though.

    enhancement 
    opened by ronfarkash 2
  • Add Subscription to Sample Code

    Add Subscription to Sample Code

    Motivation

    When the sample code in README is executed, the following logs are output on the client and server.

    Client:

    PulsarFlexResponseTimeoutError: Timeout waiting for response for request id: 0 from the Broker
    

    Server:

    2022-09-07T10:44:06,378+0900 [pulsar-io-18-2] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:62247] Got exception java.lang.IllegalStateException: Some required fields are missing
    	at org.apache.pulsar.common.api.proto.CommandSubscribe.checkRequiredFields(CommandSubscribe.java:922)
    

    It appears that this is due to the fact that "subscription" is not specified when creating Consumer.

    Modification

    Add "subscription" explicitly.

    opened by k2la 1
  • Doc: more information on what is difference vs official node client

    Doc: more information on what is difference vs official node client

    Is your feature request related to a problem? Please describe. Just found your interesting looking pulsar client! You are writing in About:

    • Pulsar Flex is a modern Apache Pulsar client for Node.js, developed to be independent of C++.
    • It was developed because the dependency in the official c++ external libraries does not fit some use cases.

    Describe the solution you'd like Would be pretty handy to give some more information on what is better/more modern and what use cases you have in mind ... in comparison to official node client.

    Maybe one could also

    • link directly to the repository of the official pulsar node client https://github.com/apache/pulsar-client-node and
    • refer to missing feature in the official client https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit#gid=1784579914

    Additional context btw: to connect communities, I opened an issue to link in Doc directly to interesting 3rd party clients like pulsar-flex https://github.com/apache/pulsar/issues/17113

    opened by hpvd 1
  • Consumer only reads the number of messages in receiveQueueSize

    Consumer only reads the number of messages in receiveQueueSize

    Describe the bug Consumer only gets the number of messages in receiveQueueSize and then stops reading.

    Reproduce Please provide reproduce, For example:

    1. Run a producer that continuously produces messages to a topic
    2. Run a consumer that subscribes to that topic and logs each topic offset
    3. After the consumer has consumed receiveQueueSize messages, it stops. Expected behavior I expect it to read that number of messages as a sort of batch (only that amount can fit in the queue at once) but to still keep reading after reaching that amount.

    Observed behavior After the consumer has consumed receiveQueueSize messages, it just stops consuming messages.

    Environment:

    • OS: CentOS 7.8
    • PulsarFlex version 1.0.1-beta.4
    • Pulsar version 2.8.1
    • NodeJS version 13.1.0

    Additional context

    bug 
    opened by sOfekS 1
  • Redeliver Unacknowledged Messages

    Redeliver Unacknowledged Messages

    Is your feature request related to a problem? Please describe. The feature is not related to a problem.

    Describe the solution you'd like Implement redeliver of unacknowledged messages.

    duplicate enhancement 
    opened by ronfarkash 1
  • moved consumer's stateChangeHandler out of constructor

    moved consumer's stateChangeHandler out of constructor

    setting the consumer's stateChangeHandler should be more flexible, setting it inside the consumer's constructor does not fit all use cases and limits its usability. This PR moves it outside the consturctor into a property so it can be accessed anytime during the consumer's existence.

    opened by sOfekS 0
  • Consumer state change events

    Consumer state change events

    Describe the solution you'd like I would like to be able to listen to events of state changes for the consumer (producer can also be nice). Say the consumer state changes to RECONNECTING, I want to catch it via an event and perform some logic.

    enhancement 
    opened by sOfekS 0
  • Fix Consumer Memory Leak: Hanging promises

    Fix Consumer Memory Leak: Hanging promises

    Fixes #79

    await process() will never be resolved due to the function calling itself again and again causing unused references to dequeued messages to stay alive, resulting in a memory leak.

    Made process() call asynchronous and added return when the function ends.

    opened by ronfarkash 0
  • Consumer Memory Leak, Hanging promises.

    Consumer Memory Leak, Hanging promises.

    Describe the bug When consuming messages, a memory leak is created due to unresolved promises. After leaving the consumer on for a while, a heap snapshot will show an increasing number of promise references.

    Reproduce

    1. Run a Consumer that continuously reads messages.
    2. Watch the process memory go up as time passes.

    Expected behavior The process memory to stay stable.

    Observed behavior Explained above.

    Environment:

    • OS: Windows 10
    • PulsarFlex version: 1.0.1-beta.8
    • Pulsar version: 2.8.1
    • NodeJS version: 16.11.1
    opened by ronfarkash 0
  • Consumer can't keep up, leading to memory leak

    Consumer can't keep up, leading to memory leak

    Describe the bug Consume on high throughput sturggles to keep up and eventually leads to memory usage rising until crashing

    Reproduce

    1. Run a consumer that subscribes to that topic and logs each topic offset
    2. Run a producer that continuously produces messages to a topic at 5MB/s (split between 70-100 messages per second)
    3. After a certain amount of time (for me around 10/15 minutes) memory usage starts rising rapidly and the subscription's backlog size also rises steadily. Expected behavior I expect the consumer to be able to handle these kinds of throughput and keep up with the consumer with memory usage staying steady.

    Observed behavior Every time the dequeue function is called, I logged the length of receiveQueue. At some point, it barely manages to empty the queue, reaching thousands of messages rapidly (even though I set receiveQueueSize to 1000). This can be seen even within seconds if the consumer starts with a few thousand messages in its backlog.

    Environment:

    • OS: CentOS 7.8
    • PulsarFlex version 1.0.1-beta.7
    • Pulsar version 2.8.1
    • NodeJS version 16.13.1
    opened by sOfekS 0
  • Warnings about messages going to pending queue when there is nothing wrong with the connection.

    Warnings about messages going to pending queue when there is nothing wrong with the connection.

    Describe the bug There is a warning that messages are going to the pending queue although they should not. Reproduce Please provide reproduce, For example:

    1. Run a producer that continuously produces messages to a topic
    2. Run a consumer that subscribes to that topic and logs each topic offset
    3. After a while you will start seeing warning that the messages are entering the pending queue Expected behavior When there is nothing wrong with the connection the messages should not enter the pending queue

    Observed behavior image Environment:

    • OS: The docker compose
    • PulsarFlex version [1.12.0]
    • Pulsar version [2.8.1]
    • NodeJS version [14.18.1]
    bug 
    opened by galrose 0
  • Is discovery server mandatory ?

    Is discovery server mandatory ?

    Is your feature request related to a problem? Please describe. When creating a producer / consummer, we have to specify a discovery server.

    Describe the solution you'd like I would like to directly specify the broker, and not a discovery server.

    Additional context I'm quite new to pulsar, so maybe i'm completly wrong. But currently, when trying to establish a connexion, it successfuly connect to the remote server : "Authentication to discovery service established, now will lookup topic", and then find me a localhost url, that can't work because everything is on the remote server.

    {"level":"INFO","timestamp":"2022-10-06T09:04:03.388Z","logger":"pulsar-flex","message":"Creating client connection for producer to topic: persistent://public/default/MY_TOPIC"}
    {"level":"INFO","timestamp":"2022-10-06T09:04:03.389Z","logger":"pulsar-flex","message":"Starting to lookup topic persistent://public/default/MY_TOPIC on __________________________________:6650"}        
    {"level":"INFO","timestamp":"2022-10-06T09:04:03.417Z","logger":"pulsar-flex","message":"Connected successfully __________________________________:6650, now sending connect command."}
    {"level":"INFO","timestamp":"2022-10-06T09:04:03.444Z","logger":"pulsar-flex","message":"Authentication to discovery service established, now will lookup topic"}
    {"level":"INFO","timestamp":"2022-10-06T09:04:03.467Z","logger":"pulsar-flex","message":"Closing connection to discovery connection"}
    {"level":"INFO","timestamp":"2022-10-06T09:04:03.469Z","logger":"pulsar-flex","message":"Lookup succeeded, owner is localhost:6650"}
    Error: connect ECONNREFUSED 127.0.0.1:6650
    
    opened by yoletx 3
  • Support connect to pulsar proxy

    Support connect to pulsar proxy

    Is your feature request related to a problem? Please describe. Currently, pulsar-flex can't recognize ProxyToBrokerUrl field, it can't support connect to pulsar-proxy

    Describe the solution you'd like Adapt pulsar-proxy protocol, recognize ProxyToBrokerUrl field, and send it to correct addr.

    Thanks for advance. And thanks for built this library. It's great for peolple who wants to use pure js. :)

    opened by Shoothzj 0
Releases(1.1.0-beta.0)
  • 1.1.0-beta.0(Jan 31, 2022)

    What's Changed

    • Consumer State Change Handling Feature by @ronfarkash in https://github.com/ayeo-flex-org/pulsar-flex/pull/82

    Full Changelog: https://github.com/ayeo-flex-org/pulsar-flex/compare/1.0.1-beta.9...1.1.0-beta.0

    Source code(tar.gz)
    Source code(zip)
  • 1.0.1-beta.9(Jan 4, 2022)

    What's Changed

    • Fix Consumer Memory Leak: Hanging promises by @ronfarkash in https://github.com/ayeo-flex-org/pulsar-flex/pull/80

    Full Changelog: https://github.com/ayeo-flex-org/pulsar-flex/compare/1.0.1-beta.8...1.0.1-beta.9

    Source code(tar.gz)
    Source code(zip)
  • 1.0.1-beta.8(Dec 14, 2021)

    What's Changed

    • changed the reflow to happen when a consumer reads a message by @galrose in https://github.com/ayeo-flex-org/pulsar-flex/pull/78

    Full Changelog: https://github.com/ayeo-flex-org/pulsar-flex/compare/1.0.1-beta.7...1.0.1-beta.8

    Source code(tar.gz)
    Source code(zip)
  • 1.0.1-beta.7(Nov 29, 2021)

    • Fixed the sequence id increment

    What's Changed

    • moved the sequenceId to increase before sending, so it always increases by @galrose in https://github.com/ayeo-flex-org/pulsar-flex/pull/76

    Full Changelog: https://github.com/ayeo-flex-org/pulsar-flex/compare/1.0.1-beta.6...1.0.1-beta.7

    Source code(tar.gz)
    Source code(zip)
  • 1.0.1-beta.6(Nov 17, 2021)

  • 1.0.1-beta.5(Oct 31, 2021)

    When sending messages/batches to a disconnected broker, exceptions weren't handled well Removed duplicate reconnect logic Fixed infinite reconnect bug

    Source code(tar.gz)
    Source code(zip)
  • 1.0.1-beta.4(Aug 31, 2021)

  • 1.0.1-beta.3(Aug 25, 2021)

  • 1.0.1-beta.2(Aug 18, 2021)

  • 1.0.0-beta.2(Aug 5, 2021)

  • 1.0.0-beta.0(Aug 4, 2021)

Owner
null
DataStax Node.js Driver for Apache Cassandra

DataStax Node.js Driver for Apache Cassandra® A modern, feature-rich and highly tunable Node.js client library for Apache Cassandra and DSE using excl

DataStax 1.2k Dec 30, 2022
Nano: The official Apache CouchDB library for Node.js

Nano Offical Apache CouchDB library for Node.js. Features: Minimalistic - There is only a minimum of abstraction between you and CouchDB. Pipes - Prox

The Apache Software Foundation 578 Dec 24, 2022
The Blog system developed by nest.js based on node.js and the database orm used typeorm, the development language used TypeScript

考拉的 Nest 实战学习系列 readme 中有很多要说的,今天刚开源还没来及更新,晚些慢慢写,其实本人最近半年多没怎么写后端代码,主要在做低代码和中台么内容,操作的也不是原生数据库而是元数据Meta,文中的原生数据库操作也当作复习下,数据库的操作为了同时适合前端和Node开发小伙伴,所以并不是很

程序员成长指北 148 Dec 22, 2022
E-Commerce Application developed with nodejs and stored to mongodb.

E-Commerce Application This Application has been developed with nodejs and mongodb. Environment Variables Create a file named config.env in config dir

Abdullah Öztürk 13 Dec 23, 2021
This is a repository that contains an simple NestJS API about Movies developed at Blue EdTech.

NestJS Movies Technologies and requirements NestJS JavaScript TypeScript Prisma MySQL Project This is a repository that contains an simple NestJS API

Isabella Nunes 2 Sep 28, 2021
API developed using NestJS, TypeORM, PgMem and concepts of Clean Architecture

The Powerful NestJS A progressive Node.js framework for building efficient and scalable server-side applications. Clean Architecture The project has b

Matheus Alexandre 20 Jan 2, 2023
PostgreSQL client for node.js.

node-postgres Non-blocking PostgreSQL client for Node.js. Pure JavaScript and optional native libpq bindings. Monorepo This repo is a monorepo which c

Brian C 10.9k Jan 9, 2023
A pure node.js JavaScript Client implementing the MySQL protocol.

mysql Table of Contents Install Introduction Contributors Sponsors Community Establishing connections Connection options SSL options Connection flags

null 17.6k Jan 1, 2023
🚀 A robust, performance-focused and full-featured Redis client for Node.js.

A robust, performance-focused and full-featured Redis client for Node.js. Supports Redis >= 2.6.12 and (Node.js >= 6). Completely compatible with Redi

Zihua Li 11.6k Jan 8, 2023
Microsoft SQL Server client for Node.js

node-mssql Microsoft SQL Server client for Node.js Supported TDS drivers: Tedious (pure JavaScript - Windows/macOS/Linux, default) Microsoft / Contrib

null 2.1k Jan 4, 2023
Couchbase Node.js Client Library (Official)

Couchbase Node.js Client The Node.js SDK library allows you to connect to a Couchbase cluster from Node.js. It is a native Node.js module and uses the

null 460 Nov 13, 2022
Node.js client for the Aerospike database

Aerospike Node.js Client An Aerospike add-on module for Node.js. The client is compatible with Node.js v8.x, v10.x (LTS), v12.x (LTS), and v14.x (LTS)

Aerospike 198 Dec 30, 2022
A proposal to add modern, easy to use binary encoders to the web platform.

proposal-binary-encoding A proposal to add modern, easy to use binary encoders to the web platform. This is proposed as an addition to the HTML spec,

Luca Casonato 35 Nov 27, 2022
Modern Query - jQuery like syntax the ES6 way

mQuery Inspired by jQuery, I want to create a small library that resembels the simplicity and ease of use of jQuery, but uses modern API of ever-green

Vitali Malinouski 16 Dec 13, 2022
A PostgreSQL client with strict types, detailed logging and assertions.

Slonik A battle-tested PostgreSQL client with strict types, detailed logging and assertions. (The above GIF shows Slonik producing query logs. Slonik

Gajus Kuizinas 3.6k Jan 3, 2023
A web client port-scanner written in GO, that supports the WASM/WASI interface for Browser WebAssembly runtime execution.

WebAssembly Port Scanner Written in Go with target WASM/WASI. The WASM main function scans all the open ports in the specified range (see main.go), vi

Avi Lumelsky 74 Dec 27, 2022
An open letter against Apple's new privacy-invasive client-side content scanning.

Apple Privacy Letter An open letter against Apple's new privacy-invasive client-side content scanning technology. View the letter Sign the letter This

Nadim Kobeissi 655 Dec 19, 2022
Reseda - reseda-client for the reseda-vpn network

Usage Create an App # with npx $ npx create-nextron-app my-app --example with-typescript # with yarn $ yarn create nextron-app my-app --example with-

Ben White 3 Dec 28, 2022
curl for GraphQL with autocomplete, subscriptions and GraphiQL. Also a dead-simple universal javascript GraphQL client.

graphqurl graphqurl is a curl like CLI for GraphQL. It's features include: CLI for making GraphQL queries. It also provisions queries with autocomplet

Hasura 3.2k Jan 3, 2023