io-ts Typed Event Bus for the runtime of your Node.js application. A core for any event-driven architecture based app.

Overview

Typed Event Bus

Based on io-ts types, this bus provides a handy interface to publish and consume events in the current runtime of the Node.js process.

Features:

  • Versatile types by io-ts (nominal, branded, logical, shape types, etc)
  • Listens to and publishes to any settled and instantiated transports
  • Awaits for transport to be ready (async)
  • Easy interface for the definition of the transports
  • Tracks all events records (business operations chain)
  • A shared bus for all the transports
  • You can set what transport the consumer should be listening to (listenTo)
  • You can explicitly where the event is going to be published to (onlySendTo: [what transport])
  • Hook into a response event with a determined contract (handy for HTTP requests that await for a response)

Soon:

  • You can provide a source connector where all the orphan events can be dumped (e.g. NoSQL storage as Mongo)
  • You can provide a source connector where all the consumed events can be dumped
  • Handy Context API to tie the event with the business operation & aggregate mutation
  • Rebuild the graph for any event from its unique ID and visualize it (and the data changes)

Install

$ npm install typed-bus

...

$ yarn add typed-bus

Glossary

  • event - completely immutable, timestamped with unique ID payload and metadata that is being transported to the consumers
  • consumer - ANY function that is added to a consumer's list and have a io-ts type shape for it's execution
  • transport - an abstraction for connections internally or externally out of the system
  • typed-bus - a bus engine that is in charge of creating Events from received payload to publish them to correct registered transports
  • orphanEvent - an event that has been published into the bus and found no consumers that match it's shape within all the registered or/and selected transports, it goes to a separate list of orphan events, handy for debugging
  • events are Immutable, pushed ONLY in a chronological order at level of transport and are de-duplicated

Start using like this

import { TypedBus, Consume } from 'typed-bus';
import * as iots from 'io-ts';

class ConsumerTest {
  @Consume(iots.type({ amount: iots.number, currency: iots.string }))
  async justConsumer(data: any) {
    console.log('I just consumed money event', data);
  }

  // this method will be listening for the events only from `kafka` transport
  @Consume(iots.type({ name: iots.string, age: iots.number }, { listenTo: ['kafka'] }))
  async nameAgeConsumerCommand(data: any) {
    console.log('I just consumed person event', data);
  }
}

// somewhere instantiate the class with correct dependencies
new ConsumerTest();

// somewhere in the app call
await TypedBus.publish({ amount: 1234, currency: 'EUR' });

You can add and remove consumers in runtime anywhere in the app

import { TypedBus, Consume } from 'typed-bus';
import * as iots from 'io-ts';

const TypeShapeToConsume = iots.type({ amount: iots.number, currency: iots.string });

let consumerId = {};
function consumerFunction(shape: iots.OutputOf<TypeShapeToConsume>) {
  // some logic, logging, etc
  ...

  // after that is done, if you want you can remove that consumer function
  TypedBus.removeConsumer(consumerId.id)
}

consumerId = TypedBus.addConsumer(TypeShapeToConsume, consumerFunction);

// anywhere in the app
TypedBus.publish({ amount: 1234, currency: 'EUR' })

Wait for a hook resolution

import { TypedBus } from 'typed-bus';
import * as iots from 'io-ts';

class ExpressController {
  @Post()
  async addMoreMoney(req: Request, res: Response) {
    console.log('I just received http request to add more money with body', req.body);

    // this will wait 10 seconds for the event resolution
    const data = await TypedBus.publish(req.body, {
      hook: iots.type({ outcome: iots.literal('MONEY_ADDED'), account: iots.string }),
    });

    res.status(200).send(data);
  }
}

// somewhere instantiate the class with correct dependencies
new ExpressController();

Define your transport

import { Consumer, Kafka, Producer } from 'kafkajs';
import * as iots from 'io-ts';

import { Transport, TypedBus } from 'typed-bus';

const KafkaSendMessagePattern = iots.type({
  topic: iots.string,
  messages: iots.array(
    iots.type({
      key: iots.string,
      value: iots.unknown,
      headers: iots.unknown,
    }),
  ),
});

export class KafkaTransport extends Transport {
  name = 'kafka';

  producer!: Producer;
  consumer!: Consumer;
  kafka!: Kafka;

  // those are the default values for the transport class
  // as kafka is an async transport, we can omit this exact definitions
  ready = false;
  waitForReady = true;

  async _startAsyncTransport(): Promise<void> {
    // we start all the kafka connections
    this.kafka = new Kafka({
      clientId: 'my-app',
      brokers: ['kafka1:9092', 'kafka2:9092'],
    });

    this.producer = this.kafka.producer();
    await this.producer.connect();

    this.consumer = this.kafka.consumer({ groupId: 'test-group' });
    await this.consumer.connect();

    await this.consumer.subscribe({ topic: 'test-topic' /** more settings from kafkajs */ });

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        // we hook into the data from Kafka and publish it into the internal bus
        // so we avoid resending received messages by any error in 'kafka' transport lane
        await TypedBus.publish({ topic, partition, message }, { onlySendTo: ['internal'] });
      },
    });

    // we declare a unique consumer for this 'kafka' transport
    // which is the actual producer.push method
    // as we dont control where it goes from our system
    TypedBus.addConsumer(KafkaSendMessagePattern, this.producer.push.bind(this.producer), {
      listenTo: ['kafka'],
    });
  }
}

If you want more control of your transport, you can implement _publish method

import * as iots from 'io-ts';

import { Event, Transport, TypedBus } from 'typed-bus';

export class MyTransport extends Transport {
  name = 'another-transport';

  async _startAsyncTransport(): Promise<void> {
    // do some async instantiation if that is needed
    ...
  }

  async _publish(event: Event): {
    orphanEvent?: boolean;
    publishedConsumers: PromiseSettledResult<void>[];
  } {
    // do some checks and verifications
    // you have access to all internal API's of transport
    // such as `this.consumers` list that gives you a fine grained control
    // you can set custom rules that should be executed before the event is sent to the consumers
    ... custom control rules, logging, etc ...
  }
}
You might also like...

Windmill: Open-source platform and runtime to turn any scripts into internal apps, integrations and workflows

Windmill: Open-source platform and runtime to turn any scripts into internal apps, integrations and workflows

. Open-source and self-hostable alternative to Airplane, Pipedream, Superblocks and a simplified Temporal with autogenerated UIs to trigger flows and

Jan 4, 2023

A zero-dependency, strongly-typed web framework for Bun, Node and Cloudflare workers

nbit A simple, declarative, type-safe way to build web services and REST APIs for Bun, Node and Cloudflare Workers. Examples See some quick examples b

Sep 16, 2022

AWS Serverless Event-driven Microservices with using AWS Lambda, AWS DynamoDB, AWS API Gateway, AWS EventBridge, AWS SQS, AWS CDK stands for Cloud Development Kit for IaC — Infrastructure as Code tool and AWS CloudWatch for monitoring.

AWS Serverless Event-driven Microservices with using AWS Lambda, AWS DynamoDB, AWS API Gateway, AWS EventBridge, AWS SQS, AWS CDK stands for Cloud Development Kit for IaC — Infrastructure as Code tool and AWS CloudWatch for monitoring.

Serverless Event-driven E-commerce Microservices UDEMY COURSE WITH DISCOUNTED - Step by Step Development of this Repository - https://www.udemy.com/c

Jan 3, 2023

💊 Event-driven DOM programming in a new style

Capsule v0.5.3 Event-driven DOM programming in a new style Features Supports event-driven style of frontend programming in a new way. Supports event d

Oct 1, 2022

375 DSA Tracker helps you build your confidence in solving any coding related question and helps you prepare for your placements. It is your personal web-based progress tracker based on 375 DSA Sheet by Aman Dhattarwal & Shradha Didi

375 DSA Tracker helps you build your confidence in solving any coding related question and helps you prepare for your placements. It is your personal web-based progress tracker based on 375 DSA Sheet by Aman Dhattarwal & Shradha Didi

375-DSA Tracker 👨‍💻 Me and my friend Abhilash Jena made a 375 DSA Tracker website based on 375 DSA Sheet by Aman Dhattarwal & Shradha Didi which hel

Nov 11, 2022

End-to-end typed monorepo template for your next project ⌨️

TYPE ⌨️ TRPC + Yarn Monorepo + Prisma + Expo This template project is a Yarn monorepo with full end-to-end type safety. Powered by: TRPC (on Fastify)

Oct 22, 2022

Fully-typed utilities for defining, validating and building your document head

zhead Typed utilities for defining, validating and building best-practice document head's. Status: Pre-release Please report any issues 🐛 Made poss

Dec 21, 2022

Bun-Bakery is a web framework for Bun. It uses a file based router in style like svelte-kit. No need to define routes during runtime.

Bun-Bakery is a web framework for Bun. It uses a file based router in style like svelte-kit. No need to define routes during runtime.

Bun Bakery Bun-Bakery is a web framework for Bun. It uses a file based router in style like svelte-kit. No need to define routes during runtime. Quick

Dec 6, 2022

This document introduces an early implementation of the Node-RED runtime that runs on resource-constrained microcontrollers (MCUs).

Node-RED MCU Edition Copyright 2022, Moddable Tech, Inc. All rights reserved. Peter Hoddie Updated June 25, 2022 Introduction This document introduces

Jan 3, 2023
Comments
  • chore(deps): bump minimist from 1.2.5 to 1.2.6

    chore(deps): bump minimist from 1.2.5 to 1.2.6

    Bumps minimist from 1.2.5 to 1.2.6.

    Commits

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    released dependencies 
    opened by dependabot[bot] 1
  • The automated release is failing 🚨

    The automated release is failing 🚨

    :rotating_light: The automated release from the master branch failed. :rotating_light:

    I recommend you give this issue a high priority, so other packages depending on you can benefit from your bug fixes and new features again.

    You can find below the list of errors reported by semantic-release. Each one of them has to be resolved in order to automatically publish your package. I’m sure you can fix this 💪.

    Errors are usually caused by a misconfiguration or an authentication problem. With each error reported below you will find explanation and guidance to help you to resolve it.

    Once all the errors are resolved, semantic-release will release your package the next time you push a commit to the master branch. You can also manually restart the failed CI job that runs semantic-release.

    If you are not sure how to resolve this, here are some links that can help you:

    If those don’t help, or if this issue is reporting something you think isn’t right, you can always ask the humans behind semantic-release.


    Cannot push to the Git repository.

    semantic-release cannot push the version tag to the branch master on the remote Git repository with URL https://[secure]@github.com/sckv/typed-bus.git.

    This can be caused by:


    Good luck with your project ✨

    Your semantic-release bot :package::rocket:

    semantic-release 
    opened by github-actions[bot] 0
Releases(v1.1.1)
Owner
Konstantin Knyazev
Sr. Software Engineer - Node.js, Go with a bit of SRE (k8s, AWS, GCP)
Konstantin Knyazev
Event Bus Demo - This product is built with Remix

Event Bus Demo This product is built with Remix Remix Docs Netlify Functions Google Maps API Development The project requires an API key. You can get

Daw-Chih Liou 14 Jan 4, 2023
An event-driven architecture wrapper for Wechaty that applies the CQS principle by using separate Query and Command messages to retrieve and modify the bot state, respectively.

CQRS Wechaty An event-driven architecture wrapper for Wechaty that applies the CQS principle by using separate Query and Command messages to retrieve

Wechaty 3 Mar 23, 2022
It shows an effective way to correct bus arrival information using data analytics based on Amazon Serverless such as Kiness Data Stream, Kinesis Data Firehose, S3, and Lambda.

Amazon Serverless를 이용한 실시간 버스 정보 수집 및 저장 본 github repository는 버스 정보를 주기적으로 수집하여 분석할 수 있도록, Amazon Serverless인 Amazon Kinesis Data Stream, Kinesis Data

John Park 4 Nov 13, 2022
An AWS Cloud Native application using CDK that defines a Serverless Event Driven application for interacting with Twitter and utilising Machine Learning / AI as a Service.

AWS Serverless Event Driven Twitter Bot An AWS Cloud Native application using CDK (Written in TypeScript) that defines a Serverless Event Driven appli

null 4 Dec 18, 2022
Get-A-Room example application using Domain Driven Design and Clean Architecture. Written in TypeScript and deployed to AWS with a serverless stack.

Domain Driven Microservices on AWS in Practice This project provides a Domain Driven Design & Clean Architecture-informed, multi-service event-driven

Mikael Vesavuori 5 Dec 31, 2022
Mekna'7, a subsidiary of the ONCF group, which provides bus services to cities not served by train, needs to set up a computer system by creating a database for managing customer reservations.

Online-bus-ticket-reservation Introduction Hello everyone, this is a project that I have done for assignment. This project is a simple online bus tick

Hala Ziani 5 Oct 25, 2022
Grupprojekt för kurserna 'Javascript med Ramverk' och 'Agil Utveckling'

JavaScript-med-Ramverk-Laboration-3 Grupprojektet för kurserna Javascript med Ramverk och Agil Utveckling. Utvecklingsguide För information om hur utv

Svante Jonsson IT-Högskolan 3 May 18, 2022
Hemsida för personer i Sverige som kan och vill erbjuda boende till människor på flykt

Getting Started with Create React App This project was bootstrapped with Create React App. Available Scripts In the project directory, you can run: np

null 4 May 3, 2022
Kurs-repo för kursen Webbserver och Databaser

Webbserver och databaser This repository is meant for CME students to access exercises and codealongs that happen throughout the course. I hope you wi

null 14 Jan 3, 2023
⚗️Nitro provides a powerful toolchain and a runtime framework from the UnJS ecosystem to build and deploy any JavaScript server, anywhere

⚗️Nitro provides a powerful toolchain and a runtime framework from the UnJS ecosystem to build and deploy any JavaScript server, anywhere

unjs 1.3k Jan 5, 2023