Skip to content

Commit

Permalink
release: version 1.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
yarncraft committed Nov 17, 2020
1 parent 8f0e5ef commit 3cdbc4a
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 75 deletions.
39 changes: 20 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<div align="center">
<h1> DistributedEventEmitter </h1>
<h1> Nanomitter </h1>

_An example Typescript implementation of a distributed EventEmitter built with the Scalability Protocols._

<img src="https://250bpm.wdfiles.com/local--files/blog:17/bus2.png" width="250">
</div>

The EventEmitter is fully distributed through the use of NanoMsg Sockets on the TCP transport (Network Layer).
The EventEmitter is fully distributed through the use of NanoMSG TCP sockets (Network Layer).
It is designed with performance and flexibility in mind and can be used to unite multiple Node.js processes running on different machines in the same network.
In order to realize this in the cloud, one could opt for a Docker Overlay Network.
In order to realize this in the cloud, one could opt for an Overlay Network like the one provided by Kubernetes.

The implementation is realized with the _NanoMsg framework_ (also called: Scalability Protocols).
I was inspired by the creator of the framework itself, who described how to realize an efficient broadcasting setup through the use of a **bus socket**: [http://250bpm.com/blog:17](http://250bpm.com/blog:17)
Expand All @@ -24,27 +24,30 @@ npm i nanomitter
_master.ts_

```ts
import { DistributedEventEmitter } from "nanomitter";
import { DistributedEvent } from "nanomitter/dist/src/types";
import { DistributedEventEmitter, DistributedEvent } from "nanomitter";

(async () => {
const emitter = await new DistributedEventEmitter().connect();
const serviceName = "master";
const heartbeatInterval = 30000;
const emitter = await new DistributedEventEmitter(
serviceName,
heartbeatInterval
).connect();
const logger = ({ topic, data }: DistributedEvent) =>
console.log(`Broadcasted ${topic} ${JSON.stringify(data)}`);
emitter.on("*", logger);
})().catch(err => {
})().catch((err) => {
console.error(err);
});
```

_worker.ts_

```ts
import { DistributedEventEmitter } from "nanomitter";
import { DistributedEvent } from "nanomitter/dist/src/types";
import { DistributedEventEmitter, DistributedEvent } from "nanomitter";

(async () => {
const emitter = await new DistributedEventEmitter().connect();
const emitter = await new DistributedEventEmitter("worker").connect();
const logger = (msg: DistributedEvent) => console.log(msg);

emitter.on("stockprice", logger);
Expand All @@ -53,48 +56,46 @@ import { DistributedEvent } from "nanomitter/dist/src/types";
() =>
emitter.emit({
topic: "stockprice",
data: { ticker: "AAPL", price: 250 + Math.random() * 10 }
data: { ticker: "AAPL", price: 250 + Math.random() * 10 },
}),
300
);
})().catch(err => {
})().catch((err) => {
console.error(err);
});
```

## API

- **async connect(addr)**
> In order to realise a distributed setup, a developer first has to connect the EventEmitter to a given address (default: 'tcp://127.0.0.1:55555'). _The first Emitter that connects to a given address binds the port and acts as the master, meaning that he is responsible for the broadcasting._ The master itself can send and receive messages just like its clients.
- **DistributedEventEmitter(serviceName, heartbeatInterval)**

> The constructor of the EventEmitter that takes an optional service and heartbeatInterval. The emitter will send a heartbeat on topic `<3` with data `{ service: serviceName }` at the given interval (default = 1 minute).
- **async connect(addr)**
> In order to realise a distributed setup, a developer first has to connect the EventEmitter to a given address (default: 'tcp://127.0.0.1:55555'). _The first Emitter that connects to a given address binds the port and acts as the master, meaning that he is responsible for the broadcasting._ The master itself can send and receive messages just like its clients.
_The remaining API methods are very similar to the conventional EventEmitter methods:_

- **addListener(topic, eventlistener)**

> Adds a listener at the end of the listeners array for the specified topic. No checks are made to see if the listener has already been added. Multiple calls passing the same combination of event and listener will result in the listener being added multiple times. Returns emitter, so calls can be chained.

- **on(topic, eventlistener)**

> Adds a listener at the end of the listeners array for the specified topic. No checks are made to see if the listener has already been added. Multiple calls passing the same combination of event and listener will result in the listener being added multiple times. Returns emitter, so calls can be chained.

- **once(topic, eventlistener)**

> Adds a one time listener to the topic. This listener is invoked only the next time the event is fired, after which it is removed. Returns emitter, so calls can be chained.

- **removeListener(topic, eventlistener)**

> Removes a listener from the listener array for the specified topic. Caution − RemoveListener will remove, at most, one instance of a listener from the listener array. If any single listener has been added multiple times to the listener array for the specified topic, then removeListener must be called multiple times to remove each instance. Returns emitter, so calls can be chained.

- **removeAllListeners(topic?)**

> Removes all listeners, or those of the specified topic. It's not a good idea to remove listeners that were added elsewhere in the code, especially when it's on an emitter that you didn't create (e.g. sockets or file streams). Returns emitter, so calls can be chained.

## Types

_The DistributedEvent type is still under consideration and can be subject to change._
Expand Down
10 changes: 0 additions & 10 deletions examples/master.ts

This file was deleted.

24 changes: 0 additions & 24 deletions examples/worker.ts

This file was deleted.

9 changes: 2 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
{
"name": "nanomitter",
"version": "1.1.0",
"version": "1.2.2",
"description": "A distributed EventEmitter built with NanoMsg (Scalability Protocols)",
"main": "dist/src/index.js",
"types": "dist/src/index.d.ts",
"main": "./index.js",
"types": "./index.d.ts",
"scripts": {
"compile": "tsc",
"start-master": "node dist/examples/master.js",
"start-workers": "node dist/examples/worker.js & node dist/examples/worker.js"
"prepublish": "tsc"
},
"author": "yarncraft",
"license": "MIT",
Expand Down
30 changes: 24 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
import msgpack from "msgpack-lite";
import nanoid from "nanoid";
import nano from "nanomsg";
import * as nano from "nanomsg";

import { DistributedEvent, EventListener } from './types'

const delay = (ms: number) => new Promise(_ => setTimeout(_, ms));
type InternalDistributedEvent = { topic: string; data: any; sender: string; fromLeader: boolean }
export type DistributedEvent = { topic: string; data: any; }
export type EventListener = (msg: DistributedEvent) => void

const delay = (ms: number) => new Promise(_ => setTimeout(_, ms));

export class DistributedEventEmitter {
id: string;
bus: nano.Socket;
leader: boolean;
listeners: Map<string, { cont: Array<EventListener>, once: Array<EventListener> }>;
addr: string;
heartbeatInterval: number;
service: string

constructor() {
constructor(service = "", heartbeatInterval = 60000) {
this.id = nanoid();
this.bus = nano.socket("bus");
this.leader = false;
this.listeners = new Map();
this.heartbeatInterval = heartbeatInterval;
this.service = service;
}

async connect(addr: string = 'tcp://127.0.0.1:55555') {
Expand All @@ -32,10 +38,22 @@ export class DistributedEventEmitter {
await delay(300)
}
if (this.leader === true) this._startBroadcast();
this._startHeartbeat();


this._startListening();
return this
}

private _startHeartbeat() {
const heartbeat = () => {
this.emit({ topic: "<3", data: { service: this.service } })
setTimeout(heartbeat, this.heartbeatInterval);
}

setTimeout(heartbeat, this.heartbeatInterval);
}

private _startBroadcast() {
this.bus.on("data", (buffer: Buffer) => this.bus.send(buffer));
}
Expand Down Expand Up @@ -99,8 +117,8 @@ export class DistributedEventEmitter {
}

emit(msg: DistributedEvent) {
let internalMsg: InternalDistributedEvent = { ...msg, fromLeader: this.leader, sender: this.id }
var buffer = msgpack.encode(internalMsg);
const internalMsg: InternalDistributedEvent = { ...msg, fromLeader: this.leader, sender: this.id }
const buffer = msgpack.encode(internalMsg);
this.bus.send(buffer)
}

Expand Down
2 changes: 0 additions & 2 deletions src/types.ts

This file was deleted.

2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"noImplicitAny": true,
"removeComments": true,
"preserveConstEnums": true,
"sourceMap": true,
"sourceMap": false,
"outDir": "dist",
"esModuleInterop": true,
"declaration": true
Expand Down

0 comments on commit 3cdbc4a

Please sign in to comment.