diff --git a/docs/class/incomer.md b/docs/class/incomer.md index 3d09cb1e..febd5b7c 100644 --- a/docs/class/incomer.md +++ b/docs/class/incomer.md @@ -30,6 +30,8 @@ const incomer = new Incomer({ eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)], eventCallback: (event) => { console.log(event); + + return OK({ status: "RESOLVED" }); } }); @@ -73,7 +75,7 @@ type IncomerOptions = { standardLog?: StandardLog; eventsCast: EventCast[]; eventsSubscribe: EventSubscribe[]; - eventCallback: (message: CallBackEventMessage) => void; + eventCallback: (message: CallBackEventMessage) => Promise; prefix?: Prefix; abortPublishTime?: number; externalsInitialized?: boolean; diff --git a/src/class/eventManagement/dispatcher.class.ts b/src/class/eventManagement/dispatcher.class.ts index bb46c81b..290a729b 100644 --- a/src/class/eventManagement/dispatcher.class.ts +++ b/src/class/eventManagement/dispatcher.class.ts @@ -218,63 +218,53 @@ export class Dispatcher extends EventEmit }); this.eventsHandler - .on("APPROVEMENT", async(registrationEvent: IncomerRegistrationMessage) => { - try { - await this.approveIncomer(registrationEvent); - } - catch (error) { - this.logger.error({ - channel: "dispatcher", - message: registrationEvent, - error: error.stack + .on("APPROVEMENT", (registrationEvent: IncomerRegistrationMessage) => { + this.approveIncomer(registrationEvent) + .catch((error) => { + this.logger.error({ + channel: "dispatcher", + message: registrationEvent, + error: error.stack + }); }); - } }) - .on("CLOSE", async(channel: string, closeEvent: CloseMessage) => { - try { - const { redisMetadata } = closeEvent; - - const relatedIncomer = await this.incomerStore.getIncomer(redisMetadata.origin); + .on("CLOSE", (channel: string, closeEvent: CloseMessage) => { + const { redisMetadata } = closeEvent; - if (!relatedIncomer) { - this.logger.warn({ channel }, "Unable to find the Incomer closing the connection"); + this.incomerStore.getIncomer(redisMetadata.origin) + .then((relatedIncomer) => { + if (!relatedIncomer) { + this.logger.warn({ channel }, "Unable to find the Incomer closing the connection"); - return; - } + return; + } - await this.removeNonActives([relatedIncomer]); - } - catch (error) { - this.logger.error({ - channel, - message: closeEvent, - error: error.stack + this.removeNonActives([relatedIncomer]); + }).catch((error) => { + this.logger.error({ + channel: "dispatcher", + message: closeEvent, + error: error.stack + }); }); - } }) - .on("RETRY", async(channel: string, retryEvent: RetryMessage) => { - try { - await this.handleRetryEvent(retryEvent); - } - catch (error) { - this.logger.error({ - channel, - message: retryEvent, - error: error.stack + .on("RETRY", (channel: string, retryEvent: RetryMessage) => { + this.handleRetryEvent(retryEvent) + .catch((error) => { + this.logger.error({ + channel, + message: retryEvent, + error: error.stack + }); }); - } }) - .on("CUSTOM_EVENT", async(channel: string, customEvent: EventMessage) => { - try { - await this.handleCustomEvents(channel, customEvent); - } - catch (error) { - this.logger.error({ - channel, + .on("CUSTOM_EVENT", (channel: string, customEvent: EventMessage) => { + this.handleCustomEvents(channel, customEvent) + .catch((error) => this.logger.error({ + channel: "dispatcher", message: customEvent, error: error.stack - }); - } + })); }); this.resolveTransactionsInterval = setInterval(() => { @@ -568,7 +558,7 @@ export class Dispatcher extends EventEmit if (recentPingTransactionKeys.length > 0) { toResolve.push(Promise.all([ - this.updateIncomerState(inactive.providedUUID), + this.incomerStore.updateIncomerState(inactive.providedUUID), transactionStore.deleteTransactions(recentPingTransactionKeys) ])); @@ -595,7 +585,7 @@ export class Dispatcher extends EventEmit const { providedUUID: uuid } = incomer; if (incomer.baseUUID === this.selfProvidedUUID) { - await this.updateIncomerState(uuid); + await this.incomerStore.updateIncomerState(uuid); continue; } @@ -656,14 +646,6 @@ export class Dispatcher extends EventEmit } } - private async updateIncomerState(origin: string) { - try { - await this.incomerStore.updateIncomerState(origin); - } - catch (error) { - this.logger.error({ uuid: origin, error: error.stack }, "Failed to update incomer state"); - } - } private async setAsActiveDispatcher() { const incomers = await this.incomerStore.getIncomers(); @@ -873,7 +855,7 @@ export class Dispatcher extends EventEmit })); } - await this.updateIncomerState(redisMetadata.origin); + await this.incomerStore.updateIncomerState(redisMetadata.origin); await Promise.all([ ...toResolve, senderTransactionStore.updateTransaction(transactionId, { diff --git a/src/class/eventManagement/incomer.class.ts b/src/class/eventManagement/incomer.class.ts index eca25477..0a793f32 100644 --- a/src/class/eventManagement/incomer.class.ts +++ b/src/class/eventManagement/incomer.class.ts @@ -79,8 +79,8 @@ function isIncomerChannelMessage(value: return value.name !== "APPROVEMENT"; } -type Resolved = "RESOLVED"; -type Unresolved = "UNRESOLVED"; +export type Resolved = "RESOLVED"; +export type Unresolved = "UNRESOLVED"; export type EventCallbackResponse = Result< T extends Resolved ? { @@ -222,7 +222,7 @@ export class Incomer < if ( transaction.redisMetadata.mainTransaction && !transaction.redisMetadata.published && - transaction.aliveSince + this.maxPingInterval < Date.now() + Number(transaction.aliveSince) + Number(this.maxPingInterval) < Date.now() ) { return this.incomerChannel.publish({ ...transaction, @@ -324,10 +324,6 @@ export class Incomer < return; } - if (this.providedUUID) { - this.subscriber.unsubscribe(`${this.prefix ? `${this.prefix}-` : ""}${this.providedUUID}`); - } - const event = { name: "REGISTER" as const, data: { @@ -588,24 +584,13 @@ export class Incomer < ): Promise { const { name } = message; - match>({ name, message } as IncomerChannelEvents) - .with({ - name: "PING" - }, - async(res: { name: "PING", message: DispatcherPingMessage }) => this.handlePing(channel, res.message)) - .with(P._, - async(res: { name: string, message: DistributedEventMessage }) => this.customEvent({ - ...res, channel - }) - ) - .exhaustive() - .catch((error) => { - this.logger.error({ - channel: "incomer", - error: error.stack, - message - }); - }); + if (name === "PING") { + await this.handlePing(channel, message as DispatcherPingMessage); + + return; + } + + await this.customEvent({ name, channel, message: message as DistributedEventMessage }); } private async handlePing(channel: string, message: DispatcherPingMessage) { @@ -671,7 +656,7 @@ export class Incomer < const callbackResult = await this.eventCallback({ ...event, eventTransactionId } as unknown as CallBackEventMessage); - if (callbackResult.ok) { + if (callbackResult && callbackResult.ok) { if (Symbol.for(callbackResult.val.status) === RESOLVED) { await store.updateTransaction(formattedTransaction.redisMetadata.transactionId, { ...formattedTransaction, diff --git a/src/schema/events/document.json b/src/schema/events/document.json index c29d7c32..4f812cce 100644 --- a/src/schema/events/document.json +++ b/src/schema/events/document.json @@ -9,9 +9,12 @@ }, "kind": { "enum": ["AF", "PF", "DB", "ED"] + }, + "name": { + "type": "string" } }, - "required": ["id", "kind"], + "required": ["id", "kind", "name"], "additionalProperties": false }, "delete": { diff --git a/test/UT/class/eventManagement/dispatcher.spec.ts b/test/UT/class/eventManagement/dispatcher.spec.ts index 5b8077e5..310ef2e2 100644 --- a/test/UT/class/eventManagement/dispatcher.spec.ts +++ b/test/UT/class/eventManagement/dispatcher.spec.ts @@ -356,7 +356,7 @@ describe("Dispatcher", () => { }); test("It should have update the update the incomer last activity", async () => { - await timers.setTimeout(10_000); + await timers.setTimeout(15_000); const pongTransactionToRetrieve = await incomerTransactionStore.getTransactionById(pongTransaction.redisMetadata.transactionId!); const pingTransaction = await dispatcherTransactionStore.getTransactionById(pingTransactionId); diff --git a/test/UT/class/eventManagement/events.spec.ts b/test/UT/class/eventManagement/events.spec.ts index 1630ac50..d2aac2ad 100644 --- a/test/UT/class/eventManagement/events.spec.ts +++ b/test/UT/class/eventManagement/events.spec.ts @@ -10,6 +10,7 @@ import { getRedis } from "@myunisoft/redis"; import * as Logger from "pino"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { @@ -26,7 +27,7 @@ import { validate } from "../../../../src/index"; const incomerLogger = Logger.pino({ level: "debug" }); -const mockedEventComeBackHandler = jest.fn(); +const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" })); async function updateRegisterTransactionState( publisherOldTransacStore: TransactionStore<"incomer">, diff --git a/test/UT/class/eventManagement/externals.spec.ts b/test/UT/class/eventManagement/externals.spec.ts index 2460b6b6..2a3d1b09 100644 --- a/test/UT/class/eventManagement/externals.spec.ts +++ b/test/UT/class/eventManagement/externals.spec.ts @@ -5,6 +5,7 @@ import { getRedis } from "@myunisoft/redis"; import * as Logger from "pino"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { Incomer } from "../../../../src/index"; @@ -32,7 +33,7 @@ afterAll(async() => { }); describe("Init Incomer without Dispatcher alive & prefix as \"test\"", () => { - const eventComeBackHandler = () => void 0; + const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));; describe("With externalsInitialized at true", () => { const incomer: Incomer = new Incomer({ diff --git a/test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts b/test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts index 21cf73e5..a5647e9b 100644 --- a/test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts +++ b/test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts @@ -10,6 +10,7 @@ import { getRedis } from "@myunisoft/redis"; import * as Logger from "pino"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { @@ -26,7 +27,7 @@ import { TransactionStore } from "../../../../src/class/store/transaction.class" const dispatcherLogger = Logger.pino({ level: "debug" }); -const mockedEventComeBackHandler = jest.fn(); +const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" })); describe("Publishing/exploiting a custom event & inactive incomer", () => { let dispatcher: Dispatcher>; @@ -200,7 +201,12 @@ describe("Publishing/exploiting a custom event & inactive incomer", () => { dispatcher["subscriber"]!.on("message", (channel, message) => dispatcher["handleMessages"](channel, message)); secondConcernedIncomer["subscriber"]!.on("message", (channel, message) => secondConcernedIncomer["handleMessages"](channel, message)); - await timers.setTimeout(5_000); + await timers.setTimeout(1_000); + + const incomer = [...(await dispatcher["incomerStore"].getIncomers()).values()].find((incomer) => incomer.baseUUID === concernedIncomer.baseUUID); + await dispatcher["removeNonActives"]([incomer!]); + + await timers.setTimeout(1_000); const mockCalls = mockedSetTransaction.mock.calls.flat(); diff --git a/test/UT/class/eventManagement/handle-inactive-with-backup.spec.ts b/test/UT/class/eventManagement/handle-inactive-with-backup.spec.ts index 727eea90..ab900d49 100644 --- a/test/UT/class/eventManagement/handle-inactive-with-backup.spec.ts +++ b/test/UT/class/eventManagement/handle-inactive-with-backup.spec.ts @@ -10,6 +10,7 @@ import { getRedis } from "@myunisoft/redis"; import * as Logger from "pino"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { @@ -26,7 +27,7 @@ import { TransactionStore } from "../../../../src/class/store/transaction.class" const dispatcherLogger = Logger.pino({ level: "debug" }); -const mockedEventComeBackHandler = jest.fn(); +const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" })); describe("Publishing/exploiting a custom event & inactive incomer", () => { let dispatcher: Dispatcher>; diff --git a/test/UT/class/eventManagement/incomer.spec.ts b/test/UT/class/eventManagement/incomer.spec.ts index 8c4bbd26..323e8022 100644 --- a/test/UT/class/eventManagement/incomer.spec.ts +++ b/test/UT/class/eventManagement/incomer.spec.ts @@ -7,6 +7,7 @@ import { closeAllRedis, getRedis } from "@myunisoft/redis"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { Dispatcher, Incomer } from "../../../../src/index"; @@ -18,7 +19,7 @@ const mockedDispatcherRemoveNonActives = jest.spyOn(Dispatcher.prototype as any, const kIdleTime = 4_000; describe("Init Incomer without Dispatcher alive", () => { - const eventComeBackHandler = () => void 0; + const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));; const pingInterval = 2_000; @@ -93,9 +94,16 @@ describe("Init Incomer without Dispatcher alive", () => { }); test("Incomer calling close, it should remove the given Incomer", async() => { - await incomer.close(); + await incomer["incomerChannel"].publish({ + name: "CLOSE", + redisMetadata: { + origin: incomer["providedUUID"], + incomerName: incomer["name"], + prefix: incomer["prefix"] + } + }); - await timers.setTimeout(500); + await timers.setTimeout(1_000); expect(mockedDispatcherRemoveNonActives).toHaveBeenCalled(); @@ -106,7 +114,6 @@ describe("Init Incomer without Dispatcher alive", () => { afterAll(async() => { await dispatcherIncomer.close(); - await incomer.close();; await closeAllRedis(); }); }); diff --git a/test/UT/class/eventManagement/ping.spec.ts b/test/UT/class/eventManagement/ping.spec.ts index 913288ad..e3ca5c52 100644 --- a/test/UT/class/eventManagement/ping.spec.ts +++ b/test/UT/class/eventManagement/ping.spec.ts @@ -10,6 +10,7 @@ import { getRedis } from "@myunisoft/redis"; import * as Logger from "pino"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { Dispatcher, Incomer } from "../../../../src/index"; @@ -23,7 +24,7 @@ const incomerLogger = Logger.pino({ const mockedIncomerLoggerDebug = jest.spyOn(incomerLogger, "debug"); describe("Ping", () => { - const eventComeBackHandler = () => void 0; + const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));; let dispatcher: Dispatcher; let incomer: Incomer; diff --git a/test/UT/class/eventManagement/registration.spec.ts b/test/UT/class/eventManagement/registration.spec.ts index e94889de..f8e54080 100644 --- a/test/UT/class/eventManagement/registration.spec.ts +++ b/test/UT/class/eventManagement/registration.spec.ts @@ -10,6 +10,7 @@ import { getRedis } from "@myunisoft/redis"; import * as Logger from "pino"; +import { Ok } from "@openally/result"; // Import Internal Dependencies import { Dispatcher, EventOptions, Incomer } from "../../../../src/index"; @@ -92,7 +93,7 @@ describe("Registration", () => { let handlePingFn: (...any) => any; let incomerProvidedUUID: string; - const eventComeBackHandler = () => void 0; + const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" })); afterAll(async() => { await incomer.close();