Skip to content

Commit

Permalink
Enhancement/event bus (#86)
Browse files Browse the repository at this point in the history
* Renamed RedisEventBusAdapter class to RedisPubSubEventBusAdapter

* Updated the IEventBusAdapter contract

* Updated the eventBusAdapterTestSuite function

* Updated the MemoryEventBusAdapter class

* Updated the MemoryEventBusAdapter class tests

* Updated the RedisPubSubEventBusAdapter class

* Updated the RedisPubSubEventBusAdapter class tests

* Updated the NoOpEventBusAdapter class

* Updated the EventBus class

* Updated the EventBus class tests

* Updated the EventBusFactory class

* Updated Cache class tests

* Remove unused class

* Updated to include 2 dispatch methods on for distpatching one event and one for dispatching multiple events

* Updated eventBusTestSuite function

* Updated EventBus class

* Added new method listenOnce to IEventBus contract

* Added new test for eventBusTestSuite function

* Updated the EventBus class

* Fixed a bug in EventBus class tests

* Updated Cache class

* Updated old changeset files

* Added changeset file
  • Loading branch information
yousif-khalil-abdulkarim authored Jan 22, 2025
1 parent 9197902 commit 5e7dae8
Show file tree
Hide file tree
Showing 21 changed files with 468 additions and 391 deletions.
2 changes: 1 addition & 1 deletion .changeset/hip-pears-warn.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"@daiso-tech/core": minor
---

# New feature
## New features
- Introduced a new static helper method <i>wrapFn</i> for the <i>LazyPromise</i> class.
This method simplifies the process of wrapping asynchronous functions with <i>LazyPromise</i>.
2 changes: 1 addition & 1 deletion .changeset/shaggy-beans-fold.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"@daiso-tech/core": minor
---

# Changes
## Changes
- Removed abstract <i>BaseCache</i> class
- Removed abstract <i>BaseEventBus</i> class
- Added lazyPromiseSettings for <i>AsyncIterableCollection</i> class
Expand Down
14 changes: 14 additions & 0 deletions .changeset/shiny-onions-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"@daiso-tech/core": minor
---

## New features

- Introduced a instance static method <i>listenOnce</i> for the <i>IEventBus</i> contract and <i>EventBus</i> class.
This method simplifies add listener that will only execute once.

## Changes

- Moved event bus group logic from the <i>Cache</i> class into the adapters classes.
- **Key Impact**: Each adapter is now required to implement the <i>getGroup</i> and <i>withGroup</i> methods.
- This change enhances flexibility for adapter-specific logic.
2 changes: 1 addition & 1 deletion src/cache/implementations/derivables/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { MemoryEventBusAdapter } from "@/event-bus/implementations/adapters/memo
import { cacheTestSuite } from "@/cache/implementations/_shared/cache.test-suite";

describe("class: Cache", () => {
const eventBus = new EventBus<any>(new MemoryEventBusAdapter());
const eventBus = new EventBus<any>(new MemoryEventBusAdapter("@global"));
let map: Map<string, unknown>;
beforeEach(() => {
map = new Map();
Expand Down
7 changes: 7 additions & 0 deletions src/cache/implementations/derivables/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ export class Cache<TType = unknown> implements IGroupableCache<TType> {
return this.eventBus.removeListenerMany(eventNames, listener);
}

listenOnce<TEventName extends keyof CacheEvents>(
eventName: TEventName,
listener: Listener<SelectEvent<CacheEvents, TEventName>>,
): LazyPromise<void> {
return this.eventBus.listenOnce(eventName, listener);
}

subscribe<TEventName extends keyof CacheEvents>(
eventName: TEventName,
listener: Listener<SelectEvent<CacheEvents, TEventName>>,
Expand Down
15 changes: 10 additions & 5 deletions src/event-bus/contracts/event-bus-adapter.contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

import type { Listener } from "@/event-bus/contracts/_shared";
import type { OneOrMore } from "@/utilities/_module";

export type IBaseEvent = {
type: string;
Expand All @@ -15,23 +16,27 @@ export type IBaseEvent = {
*/
export type IEventBusAdapter = {
/**
* The <i>addListener</i> method is used for adding <i>{@link Listener | listener}</i> for certain <i>event</i>.
* The <i>addListener</i> method is used for adding <i>{@link Listener | listener}</i> for certain <i>eventName</i>.
*/
addListener(
event: string,
eventName: string,
listener: Listener<IBaseEvent>,
): PromiseLike<void>;

/**
* The <i>removeListener</i> method is used for removing <i>{@link Listener | listener}</i> for certain <i>event</i>.
* The <i>removeListener</i> method is used for removing <i>{@link Listener | listener}</i> for certain <i>eventName</i>.
*/
removeListener(
event: string,
eventName: string,
listener: Listener<IBaseEvent>,
): PromiseLike<void>;

/**
* The <i>dispatch</i> method is used for dispatching one or multiple <i>events</i>.
*/
dispatch(events: IBaseEvent[]): PromiseLike<void>;
dispatch(event: IBaseEvent): PromiseLike<void>;

getGroup(): string;

withGroup(group: OneOrMore<string>): IEventBusAdapter;
};
33 changes: 31 additions & 2 deletions src/event-bus/contracts/event-bus.contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export type AllEvents<TEvents extends BaseEvents> = Values<{
};
}>;

/**
* @group Contracts
*/
export type SelectEvent<
TEvents extends BaseEvents,
TEventName extends keyof TEvents,
Expand All @@ -31,8 +34,18 @@ export type SelectEvent<
};
}[TEventName];

/**
* @group Contracts
*/
export type Unsubscribe = () => LazyPromise<void>;

/**
* @group Contracts
*/
export type AddListenerSettings = {
signal?: AbortSignal;
};

/**
* The <i>IListenable</i> contract defines a way listening to events independent of underlying technology
* @group Contracts
Expand Down Expand Up @@ -78,6 +91,15 @@ export type IListenable<TEvents extends BaseEvents = BaseEvents> = {
listener: Listener<SelectEvent<TEvents, TEventName>>,
): LazyPromise<void>;

/**
* The <i>listenOnce</i> method is used for adding <i>{@link Listener | listener}</i> for certain <i>event</i> that is trigged only once.
* @throws {AddListenerEventBusError} {@link AddListenerEventBusError}
*/
listenOnce<TEventName extends keyof TEvents>(
eventName: TEventName,
listener: Listener<SelectEvent<TEvents, TEventName>>,
): LazyPromise<void>;

/**
* The <i>subscribe</i> method is used for adding <i>{@link Listener | listener}</i> for certain <i>event</i>.
* A listener can only be added once for a specific event. Adding the same listener multiple times will have no effect and nothing will occur.
Expand All @@ -103,11 +125,18 @@ export type IListenable<TEvents extends BaseEvents = BaseEvents> = {
*/
export type IDispatcher<TEvents extends BaseEvents = BaseEvents> = {
/**
* The <i>dispatch</i> method is used for dispatching one or multiple <i>events</i>.
* The <i>dispatch</i> method is used for dispatching a <i>event</i>.
* @throws {DispatchEventBusError} {@link DispatchEventBusError}
*/
dispatch(event: AllEvents<TEvents>): LazyPromise<void>;

/**
* The <i>dispatchMany</i> method is used for dispatching multiple <i>event</i>.
* @throws {DispatchEventBusError} {@link DispatchEventBusError}
*/
dispatch(events: OneOrMore<AllEvents<TEvents>>): LazyPromise<void>;
dispatchMany(events: AllEvents<TEvents>[]): LazyPromise<void>;
};

/**
Expand Down
138 changes: 100 additions & 38 deletions src/event-bus/implementations/_shared/event-bus-adapter.test-suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ export type EventBusAdapterTestSuiteSettings = {
test: TestAPI;
describe: SuiteAPI;
beforeEach: typeof beforeEach;
createAdapter: () => Promisable<IEventBusAdapter>;
createAdapterA: () => Promisable<IEventBusAdapter>;
createAdapterB: () => Promisable<IEventBusAdapter>;
};

/**
Expand All @@ -37,10 +38,10 @@ export type EventBusAdapterTestSuiteSettings = {
* import type { StartedRedisContainer } from "@testcontainers/redis";
* import { RedisContainer } from "@testcontainers/redis";
* import Redis from "ioredis";
* import { SuperJsonSerializer, TimeSpan, RedisEventBusAdapter, eventBusAdapterTestSuite } from "@daiso-tech/core";
* import { SuperJsonSerializer, TimeSpan, RedisPubSubEventBusAdapter, eventBusAdapterTestSuite } from "@daiso-tech/core";
*
* const timeout = TimeSpan.fromMinutes(2);
* describe("class: RedisEventBusAdapter", () => {
* describe("class: RedisPubSubEventBusAdapter", () => {
* let dispatcherClient: Redis;
* let listenerClient: Redis;
* let startedContainer: StartedRedisContainer;
Expand All @@ -56,11 +57,19 @@ export type EventBusAdapterTestSuiteSettings = {
* await startedContainer.stop();
* }, timeout.toMilliseconds());
* eventBusAdapterTestSuite({
* createAdapter: () =>
* new RedisEventBusAdapter({
* createAdapterA: () =>
* new RedisPubSubEventBusAdapter({
* dispatcherClient,
* listenerClient,
* serializer,
* rootGroup: "@global"
* }),
* createAdapterB: () =>
* new RedisPubSubEventBusAdapter({
* dispatcherClient,
* listenerClient,
* serializer,
* rootGroup: "@global"
* }),
* test,
* beforeEach,
Expand All @@ -73,49 +82,102 @@ export type EventBusAdapterTestSuiteSettings = {
export function eventBusAdapterTestSuite(
settings: EventBusAdapterTestSuiteSettings,
): void {
const { expect, test, createAdapter, beforeEach } = settings;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let eventBusAdapter: IEventBusAdapter;
const { expect, test, createAdapterA, createAdapterB, beforeEach } =
settings;

let eventBusAdapterA: IEventBusAdapter;
let eventBusAdapterB: IEventBusAdapter;
beforeEach(async () => {
eventBusAdapter = await createAdapter();
eventBusAdapterA = await createAdapterA();
eventBusAdapterB = await createAdapterB();
});

describe("method: addListener, removeListener, dispatch", () => {
test("Should be null when listener added and event is not triggered", async () => {
const TYPE = "type";
let result: IBaseEvent | null = null;
await eventBusAdapter.addListener(TYPE, (event) => {
result = event;
const TTL = TimeSpan.fromMilliseconds(50);
describe("Api tests:", () => {
describe("method: addListener, removeListener, dispatch", () => {
test("Should be null when listener added and event is not triggered", async () => {
const TYPE = "type";
let result: IBaseEvent | null = null;
await eventBusAdapterA.addListener(TYPE, (event) => {
result = event;
});
expect(result).toBeNull();
});
expect(result).toBeNull();
});
test("Should be IBaseEvent when listener added and event is triggered", async () => {
const TYPE = "type";
let result: IBaseEvent | null = null;
await eventBusAdapter.addListener(TYPE, (event) => {
result = event;
test("Should be IBaseEvent when listener added and event is triggered", async () => {
const TYPE = "type";
let result: IBaseEvent | null = null;
await eventBusAdapterA.addListener(TYPE, (event) => {
result = event;
});
const event: IBaseEvent = {
type: TYPE,
};
await eventBusAdapterA.dispatch(event);
await delay(TTL);
expect(result).toEqual(event);
});
test("Should be null when listener removed and event is triggered", async () => {
const TYPE = "type";
let result: IBaseEvent | null = null;
const listener = (event: IBaseEvent) => {
result = event;
};
await eventBusAdapterA.addListener(TYPE, listener);
await eventBusAdapterA.removeListener(TYPE, listener);
const event: IBaseEvent = {
type: TYPE,
};
await eventBusAdapterA.dispatch(event);
await delay(TTL);
expect(result).toBeNull();
});
});
});
describe("Group tests:", () => {
test("method: addListener / dispatch", async () => {
const event: IBaseEvent = {
type: TYPE,
type: "type",
};
await eventBusAdapter.dispatch([event]);
await delay(TimeSpan.fromMilliseconds(50));
expect(result).toEqual(event);

let result_a: IBaseEvent | null = null;
await eventBusAdapterA.addListener(event.type, (event) => {
result_a = event;
});

let result_b: IBaseEvent | null = null;
await eventBusAdapterB.addListener(event.type, (event) => {
result_b = event;
});

await eventBusAdapterA.dispatch(event);
await delay(TTL);

expect(result_a).toEqual(event);
expect(result_b).toBeNull();
});
test("Should be null when listener removed and event is triggered", async () => {
const TYPE = "type";
let result: IBaseEvent | null = null;
const listener = (event: IBaseEvent) => {
result = event;
};
await eventBusAdapter.addListener(TYPE, listener);
await eventBusAdapter.removeListener(TYPE, listener);
test("method: removeListener / addListener / dispatch", async () => {
const event: IBaseEvent = {
type: TYPE,
type: "type",
};
await delay(TimeSpan.fromMilliseconds(50));
await eventBusAdapter.dispatch([event]);
expect(result).toBeNull();

let result_a: IBaseEvent | null = null;
await eventBusAdapterA.addListener(event.type, (event) => {
result_a = event;
});

let result_b: IBaseEvent | null = null;
const listenerB = (event: IBaseEvent) => {
result_b = event;
};
await eventBusAdapterB.addListener(event.type, listenerB);
await eventBusAdapterB.removeListener(event.type, listenerB);

await eventBusAdapterA.dispatch(event);
await eventBusAdapterB.dispatch(event);
await delay(TTL);

expect(result_a).toEqual(event);
expect(result_b).toBeNull();
});
});
}
Loading

0 comments on commit 5e7dae8

Please sign in to comment.