Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(/class/eventManagement/dispatcher): handle promises rejectio… #258

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/class/incomer.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const incomer = new Incomer({
eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)],
eventCallback: (event) => {
console.log(event);

return OK({ status: "RESOLVED" });
}
});

Expand Down Expand Up @@ -73,7 +75,7 @@ type IncomerOptions<T extends GenericEvent = GenericEvent> = {
standardLog?: StandardLog<T>;
eventsCast: EventCast[];
eventsSubscribe: EventSubscribe[];
eventCallback: (message: CallBackEventMessage<T>) => void;
eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;
prefix?: Prefix;
abortPublishTime?: number;
externalsInitialized?: boolean;
Expand Down
96 changes: 39 additions & 57 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,63 +218,53 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
});

this.eventsHandler
.on("APPROVEMENT", async(registrationEvent: IncomerRegistrationMessage) => {
try {
await this.approveIncomer(registrationEvent);
}
catch (error) {
this.logger.error({
channel: "dispatcher",
message: registrationEvent,
error: error.stack
.on("APPROVEMENT", (registrationEvent: IncomerRegistrationMessage) => {
this.approveIncomer(registrationEvent)
.catch((error) => {
this.logger.error({
channel: "dispatcher",
message: registrationEvent,
error: error.stack
});
});
}
})
.on("CLOSE", async(channel: string, closeEvent: CloseMessage) => {
try {
const { redisMetadata } = closeEvent;

const relatedIncomer = await this.incomerStore.getIncomer(redisMetadata.origin);
.on("CLOSE", (channel: string, closeEvent: CloseMessage) => {
const { redisMetadata } = closeEvent;

if (!relatedIncomer) {
this.logger.warn({ channel }, "Unable to find the Incomer closing the connection");
this.incomerStore.getIncomer(redisMetadata.origin)
.then((relatedIncomer) => {
if (!relatedIncomer) {
this.logger.warn({ channel }, "Unable to find the Incomer closing the connection");

return;
}
return;
}

await this.removeNonActives([relatedIncomer]);
}
catch (error) {
this.logger.error({
channel,
message: closeEvent,
error: error.stack
this.removeNonActives([relatedIncomer]);
}).catch((error) => {
this.logger.error({
channel: "dispatcher",
message: closeEvent,
error: error.stack
});
});
}
})
.on("RETRY", async(channel: string, retryEvent: RetryMessage) => {
try {
await this.handleRetryEvent(retryEvent);
}
catch (error) {
this.logger.error({
channel,
message: retryEvent,
error: error.stack
.on("RETRY", (channel: string, retryEvent: RetryMessage) => {
this.handleRetryEvent(retryEvent)
.catch((error) => {
this.logger.error({
channel,
message: retryEvent,
error: error.stack
});
});
}
})
.on("CUSTOM_EVENT", async(channel: string, customEvent: EventMessage<T>) => {
try {
await this.handleCustomEvents(channel, customEvent);
}
catch (error) {
this.logger.error({
channel,
.on("CUSTOM_EVENT", (channel: string, customEvent: EventMessage<T>) => {
this.handleCustomEvents(channel, customEvent)
.catch((error) => this.logger.error({
channel: "dispatcher",
message: customEvent,
error: error.stack
});
}
}));
});

this.resolveTransactionsInterval = setInterval(() => {
Expand Down Expand Up @@ -568,7 +558,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

if (recentPingTransactionKeys.length > 0) {
toResolve.push(Promise.all([
this.updateIncomerState(inactive.providedUUID),
this.incomerStore.updateIncomerState(inactive.providedUUID),
transactionStore.deleteTransactions(recentPingTransactionKeys)
]));

Expand All @@ -595,7 +585,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
const { providedUUID: uuid } = incomer;

if (incomer.baseUUID === this.selfProvidedUUID) {
await this.updateIncomerState(uuid);
await this.incomerStore.updateIncomerState(uuid);

continue;
}
Expand Down Expand Up @@ -656,14 +646,6 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}
}

private async updateIncomerState(origin: string) {
try {
await this.incomerStore.updateIncomerState(origin);
}
catch (error) {
this.logger.error({ uuid: origin, error: error.stack }, "Failed to update incomer state");
}
}

private async setAsActiveDispatcher() {
const incomers = await this.incomerStore.getIncomers();
Expand Down Expand Up @@ -873,7 +855,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}));
}

await this.updateIncomerState(redisMetadata.origin);
await this.incomerStore.updateIncomerState(redisMetadata.origin);
await Promise.all([
...toResolve,
senderTransactionStore.updateTransaction(transactionId, {
Expand Down
37 changes: 11 additions & 26 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ function isIncomerChannelMessage<T extends GenericEvent = GenericEvent>(value:
return value.name !== "APPROVEMENT";
}

type Resolved = "RESOLVED";
type Unresolved = "UNRESOLVED";
export type Resolved = "RESOLVED";
export type Unresolved = "UNRESOLVED";

export type EventCallbackResponse<T extends Resolved | Unresolved = Resolved | Unresolved> = Result<
T extends Resolved ? {
Expand Down Expand Up @@ -222,7 +222,7 @@ export class Incomer <
if (
transaction.redisMetadata.mainTransaction &&
!transaction.redisMetadata.published &&
transaction.aliveSince + this.maxPingInterval < Date.now()
Number(transaction.aliveSince) + Number(this.maxPingInterval) < Date.now()
) {
return this.incomerChannel.publish({
...transaction,
Expand Down Expand Up @@ -324,10 +324,6 @@ export class Incomer <
return;
}

if (this.providedUUID) {
this.subscriber.unsubscribe(`${this.prefix ? `${this.prefix}-` : ""}${this.providedUUID}`);
}

const event = {
name: "REGISTER" as const,
data: {
Expand Down Expand Up @@ -588,24 +584,13 @@ export class Incomer <
): Promise<void> {
const { name } = message;

match<IncomerChannelEvents<T>>({ name, message } as IncomerChannelEvents<T>)
.with({
name: "PING"
},
async(res: { name: "PING", message: DispatcherPingMessage }) => this.handlePing(channel, res.message))
.with(P._,
async(res: { name: string, message: DistributedEventMessage<T> }) => this.customEvent({
...res, channel
})
)
.exhaustive()
.catch((error) => {
this.logger.error({
channel: "incomer",
error: error.stack,
message
});
});
if (name === "PING") {
await this.handlePing(channel, message as DispatcherPingMessage);

return;
}

await this.customEvent({ name, channel, message: message as DistributedEventMessage<T> });
}

private async handlePing(channel: string, message: DispatcherPingMessage) {
Expand Down Expand Up @@ -671,7 +656,7 @@ export class Incomer <

const callbackResult = await this.eventCallback({ ...event, eventTransactionId } as unknown as CallBackEventMessage<T>);

if (callbackResult.ok) {
if (callbackResult && callbackResult.ok) {
if (Symbol.for(callbackResult.val.status) === RESOLVED) {
await store.updateTransaction(formattedTransaction.redisMetadata.transactionId, {
...formattedTransaction,
Expand Down
5 changes: 4 additions & 1 deletion src/schema/events/document.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
},
"kind": {
"enum": ["AF", "PF", "DB", "ED"]
},
"name": {
"type": "string"
}
},
"required": ["id", "kind"],
"required": ["id", "kind", "name"],
"additionalProperties": false
},
"delete": {
Expand Down
2 changes: 1 addition & 1 deletion test/UT/class/eventManagement/dispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ describe("Dispatcher", () => {
});

test("It should have update the update the incomer last activity", async () => {
await timers.setTimeout(10_000);
await timers.setTimeout(15_000);

const pongTransactionToRetrieve = await incomerTransactionStore.getTransactionById(pongTransaction.redisMetadata.transactionId!);
const pingTransaction = await dispatcherTransactionStore.getTransactionById(pingTransactionId);
Expand Down
3 changes: 2 additions & 1 deletion test/UT/class/eventManagement/events.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import {
Expand All @@ -26,7 +27,7 @@ import { validate } from "../../../../src/index";
const incomerLogger = Logger.pino({
level: "debug"
});
const mockedEventComeBackHandler = jest.fn();
const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));

async function updateRegisterTransactionState(
publisherOldTransacStore: TransactionStore<"incomer">,
Expand Down
3 changes: 2 additions & 1 deletion test/UT/class/eventManagement/externals.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import { Incomer } from "../../../../src/index";
Expand Down Expand Up @@ -32,7 +33,7 @@ afterAll(async() => {
});

describe("Init Incomer without Dispatcher alive & prefix as \"test\"", () => {
const eventComeBackHandler = () => void 0;
const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));;

describe("With externalsInitialized at true", () => {
const incomer: Incomer = new Incomer({
Expand Down
10 changes: 8 additions & 2 deletions test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import {
Expand All @@ -26,7 +27,7 @@ import { TransactionStore } from "../../../../src/class/store/transaction.class"
const dispatcherLogger = Logger.pino({
level: "debug"
});
const mockedEventComeBackHandler = jest.fn();
const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));

describe("Publishing/exploiting a custom event & inactive incomer", () => {
let dispatcher: Dispatcher<EventOptions<keyof Events>>;
Expand Down Expand Up @@ -200,7 +201,12 @@ describe("Publishing/exploiting a custom event & inactive incomer", () => {
dispatcher["subscriber"]!.on("message", (channel, message) => dispatcher["handleMessages"](channel, message));
secondConcernedIncomer["subscriber"]!.on("message", (channel, message) => secondConcernedIncomer["handleMessages"](channel, message));

await timers.setTimeout(5_000);
await timers.setTimeout(1_000);

const incomer = [...(await dispatcher["incomerStore"].getIncomers()).values()].find((incomer) => incomer.baseUUID === concernedIncomer.baseUUID);
await dispatcher["removeNonActives"]([incomer!]);

await timers.setTimeout(1_000);

const mockCalls = mockedSetTransaction.mock.calls.flat();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import {
Expand All @@ -26,7 +27,7 @@ import { TransactionStore } from "../../../../src/class/store/transaction.class"
const dispatcherLogger = Logger.pino({
level: "debug"
});
const mockedEventComeBackHandler = jest.fn();
const mockedEventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));

describe("Publishing/exploiting a custom event & inactive incomer", () => {
let dispatcher: Dispatcher<EventOptions<keyof Events>>;
Expand Down
15 changes: 11 additions & 4 deletions test/UT/class/eventManagement/incomer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
closeAllRedis,
getRedis
} from "@myunisoft/redis";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import { Dispatcher, Incomer } from "../../../../src/index";
Expand All @@ -18,7 +19,7 @@ const mockedDispatcherRemoveNonActives = jest.spyOn(Dispatcher.prototype as any,
const kIdleTime = 4_000;

describe("Init Incomer without Dispatcher alive", () => {
const eventComeBackHandler = () => void 0;
const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));;

const pingInterval = 2_000;

Expand Down Expand Up @@ -93,9 +94,16 @@ describe("Init Incomer without Dispatcher alive", () => {
});

test("Incomer calling close, it should remove the given Incomer", async() => {
await incomer.close();
await incomer["incomerChannel"].publish({
name: "CLOSE",
redisMetadata: {
origin: incomer["providedUUID"],
incomerName: incomer["name"],
prefix: incomer["prefix"]
}
});

await timers.setTimeout(500);
await timers.setTimeout(1_000);

expect(mockedDispatcherRemoveNonActives).toHaveBeenCalled();

Expand All @@ -106,7 +114,6 @@ describe("Init Incomer without Dispatcher alive", () => {

afterAll(async() => {
await dispatcherIncomer.close();
await incomer.close();;
await closeAllRedis();
});
});
3 changes: 2 additions & 1 deletion test/UT/class/eventManagement/ping.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
getRedis
} from "@myunisoft/redis";
import * as Logger from "pino";
import { Ok } from "@openally/result";

// Import Internal Dependencies
import { Dispatcher, Incomer } from "../../../../src/index";
Expand All @@ -23,7 +24,7 @@ const incomerLogger = Logger.pino({
const mockedIncomerLoggerDebug = jest.spyOn(incomerLogger, "debug");

describe("Ping", () => {
const eventComeBackHandler = () => void 0;
const eventComeBackHandler = jest.fn().mockImplementation(() => Ok({ status: "RESOLVED" }));;

let dispatcher: Dispatcher;
let incomer: Incomer;
Expand Down
Loading
Loading