Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor on custom event #245

Merged
merged 11 commits into from
Jun 24, 2024
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"dependencies": {
"@myunisoft/redis": "github:MyUnisoft/redis#v3.4.2",
"@openally/mutex": "^1.0.0",
"@openally/result": "^1.2.1",
"ajv": "^8.12.0",
"pino-pretty": "^10.3.1",
"ts-pattern": "^4.3.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,6 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
inactiveIncomerTransactionStore.deleteTransaction(relatedHandlerTransaction.redisMetadata.transactionId)
]));

// To publish later when concerned Incomer

this.logger.debug(this.standardLogFn(
dispatcherTransaction as unknown as StandardLogOpts<T>
)("Spread transaction has been backup"));
Expand Down
24 changes: 16 additions & 8 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
import { pino } from "pino";
import { P, match } from "ts-pattern";
import { ValidateFunction } from "ajv";
import { Result } from "@openally/result";

// Import Internal Dependencies
import {
Expand Down Expand Up @@ -74,6 +75,8 @@ function isIncomerChannelMessage<T extends GenericEvent = GenericEvent>(value:
return value.name !== "APPROVEMENT";
}

export type EventCallbackResponse = Result<"Resolved" | "NotResolved", string>;

export type IncomerOptions<T extends GenericEvent = GenericEvent> = {
/* Service name */
name: string;
Expand All @@ -86,7 +89,7 @@ export type IncomerOptions<T extends GenericEvent = GenericEvent> = {
eventsValidationFn?: eventsValidationFn<T>;
customValidationCbFn?: customValidationCbFn<T>;
};
eventCallback: (message: CallBackEventMessage<T>) => void;
eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;
dispatcherInactivityOptions?: {
/* max interval between received ping before considering dispatcher off */
maxPingInterval?: number;
Expand All @@ -102,7 +105,7 @@ export class Incomer <
> extends EventEmitter {
readonly name: string;
readonly prefix: Prefix | undefined;
readonly eventCallback: (message: CallBackEventMessage<T>) => void;
readonly eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;

public dispatcherConnectionState = false;
public baseUUID = randomUUID();
Expand Down Expand Up @@ -642,18 +645,23 @@ export class Incomer <

const formattedTransaction = await store.setTransaction(transaction);

await Promise.all([
this.eventCallback({ ...event, eventTransactionId } as unknown as CallBackEventMessage<T>),
store.updateTransaction(formattedTransaction.redisMetadata.transactionId, {
const callbackResult = await this.eventCallback({ ...event, eventTransactionId } as unknown as CallBackEventMessage<T>);

if (callbackResult.ok) {
await store.updateTransaction(formattedTransaction.redisMetadata.transactionId, {
...formattedTransaction,
redisMetadata: {
...formattedTransaction.redisMetadata,
resolved: true
}
} as Transaction<"incomer">)
]);
} as Transaction<"incomer">);

this.logger.info(this.standardLogFn(logData)("Resolved Custom event"));

return;
}

this.logger.info(this.standardLogFn(logData)("Resolved Custom event"));
this.logger.info(this.standardLogFn(logData)(`Callback error reason: ${callbackResult.val}`));
}

private async handleApprovement(message: DispatcherApprovementMessage) {
Expand Down
2 changes: 1 addition & 1 deletion src/class/store/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export class IncomerStore extends KVPeer<RegisteredIncomer> {
const incomer = await this.getValue(incomerKey);

if (!incomer) {
throw new Error("Cannot find the Incomer");
throw new Error(`Cannot find the Incomer ${incomerKey}`);
}

this.setValue({ key: incomerKey, value: { ...incomer, lastActivity: Date.now() } });
Expand Down
2 changes: 1 addition & 1 deletion test/UT/class/eventManagement/dispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const mockedLoggerInfo = jest.spyOn(logger, "info");
const mockedHandleDispatcherMessages = jest.spyOn(Dispatcher.prototype as any, "approveIncomer");
const mockedHandleIncomerMessages = jest.spyOn(Dispatcher.prototype as any, "handleCustomEvents");
const mockedPing = jest.spyOn(Dispatcher.prototype as any, "ping");
const mockedCheckLastActivity = jest.spyOn(Dispatcher.prototype as any, "checkLastActivityIntervalFn");
const mockedCheckLastActivity = jest.spyOn(Dispatcher.prototype as any, "checkLastActivity");
const mockedHandleInactiveIncomer = jest.spyOn(TransactionHandler.prototype, "resolveInactiveIncomerTransactions");

const mockedSetTransaction = jest.spyOn(TransactionStore.prototype, "setTransaction");
Expand Down
15 changes: 8 additions & 7 deletions test/UT/class/eventManagement/handle-inactive-no-backup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,25 @@ describe("Publishing/exploiting a custom event & inactive incomer", () => {
});

await concernedIncomer.initialize();
});

test("expect the second incomer to have handle the event by retaking the main Transaction", async() => {
await concernedIncomer.publish(event);

await timers.setTimeout(1_000);

expect(handlerTransaction).toBeDefined();

await secondConcernedIncomer.initialize();
await concernedIncomer.close();

jest.spyOn(dispatcher as any, "checkLastActivity").mockImplementation(async(opts: any) => {
// do nothing
});
dispatcher["subscriber"]!.on("message", (channel, message) => dispatcher["handleMessages"](channel, message));
secondConcernedIncomer["subscriber"]!.on("message", (channel, message) => secondConcernedIncomer["handleMessages"](channel, message));

await secondConcernedIncomer.initialize();

await timers.setTimeout(10_000);
});

test("expect the second incomer to have handle the event by retaking the main Transaction", async() => {
await timers.setTimeout(18_000);
await timers.setTimeout(5_000);

const mockCalls = mockedSetTransaction.mock.calls.flat();

Expand Down
Loading