Skip to content

Commit

Permalink
feat(./class/eventManagement/incomer): log on retry publish (#288)
Browse files Browse the repository at this point in the history
* feat(./class/eventManagement/incomer): log on retry publish

* refactor(./class/eventManagement/dispatcher): renamed fn

* feat(./class/eventManagement/incomer): log dispatcherConnectionState

* test(/UT/dispatcher/transaction-handler): add ut for unresolved event & available backup

* refactor(./class/eventManagement/dispatcher/transaction-handler): log info

* refactor(): clean import

* test(./UT/eventManagement/dispatcher/transaction-handler): ut for main transactions
  • Loading branch information
Rossb0b authored Aug 27, 2024
1 parent 7012a9a commit a5dbdc4
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 286 deletions.
8 changes: 4 additions & 4 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -758,19 +758,19 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

const filteredConcernedIncomers = await this.getFilteredConcernedIncomers(eventRest.name);

if (filteredConcernedIncomers.length === 0) {
if (filteredConcernedIncomers.length === 0 && relatedTransaction !== null) {
if (eventRest.name !== "PING") {
this.logger.warn(this.standardLogFn(logData)(`No concerned incomers found for event: ${eventRest.name}`));

await this.backupNotdistributableEvents(senderTransactionStore, relatedTransaction, eventRest);
await this.backupUndeliverableEvents(senderTransactionStore, relatedTransaction, eventRest);

this.logger.warn(this.standardLogFn(logData)("Backed-up event"));
}

return;
}

if (!relatedTransaction) {
if (!relatedTransaction || relatedTransaction === null) {
this.logger.warn(this.standardLogFn(logData)(`Couldn't find the related main transaction for: ${transactionId}`));

return;
Expand Down Expand Up @@ -867,7 +867,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
return filteredConcernedIncomers;
}

private async backupNotdistributableEvents(
private async backupUndeliverableEvents(
senderTransactionStore: TransactionStore<"incomer">,
relatedTransaction: Transaction<"incomer">,
eventRest: Omit<EventMessage, "redisMetadata">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
for (const incomer of incomers.values()) {
if (incomer.providedUUID === inactiveIncomer.providedUUID) {
incomers.delete(incomer);

break;
}
}
Expand Down Expand Up @@ -401,7 +401,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
inactiveIncomerTransactionStore.deleteTransaction(relatedHandlerTransaction.redisMetadata.transactionId)
]));

this.logger.debug(this.standardLogFn({
this.logger.info(this.standardLogFn({
...dispatcherTransaction,
redisMetadata: {
...dispatcherTransaction.redisMetadata,
Expand Down
105 changes: 71 additions & 34 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-lines */
// Import Node.js Dependencies
import { once, EventEmitter } from "node:events";
import { randomUUID } from "node:crypto";
Expand Down Expand Up @@ -195,6 +196,14 @@ export class Incomer <
}
}

get redis() {
return getRedis();
}

get subscriber() {
return getRedis("subscriber");
}

private async checkDispatcherState() {
const date = Date.now();

Expand All @@ -213,36 +222,41 @@ export class Incomer <
(transactionKey) => store.getValue(transactionKey)
));

await Promise.race([
Promise.all(transactions.map((transaction) => {
if (
transaction.redisMetadata.mainTransaction &&
!transaction.redisMetadata.published &&
Number(transaction.aliveSince) + Number(this.maxPingInterval) < Date.now()
) {
return this.incomerChannel.publish({
...transaction,
redisMetadata: {
transactionId: transaction.redisMetadata.transactionId,
origin: transaction.redisMetadata.origin,
prefix: transaction.redisMetadata.prefix
}
} as unknown as IncomerChannelMessages<T>["IncomerMessages"]);
}
const eventToPublish = transactions.map((transaction) => {
if (
transaction.redisMetadata.mainTransaction &&
!transaction.redisMetadata.published &&
Number(transaction.aliveSince) + Number(this.maxPingInterval) < Date.now()
) {
return this.retryPublish(transaction);
}

return void 0;
})),
new Promise((_, reject) => timers.setTimeout(this.maxPingInterval).then(() => reject(new Error())))
return void 0;
});

await Promise.race([
Promise.all(eventToPublish),
timers.setTimeout(this.maxPingInterval)
]);
}
}

get redis() {
return getRedis();
}

get subscriber() {
return getRedis("subscriber");
private async retryPublish(transaction: any) {
await this.incomerChannel.publish({
...transaction,
redisMetadata: {
transactionId: transaction.redisMetadata.transactionId,
origin: transaction.redisMetadata.origin,
prefix: transaction.redisMetadata.prefix
}
} as unknown as IncomerChannelMessages<T>["IncomerMessages"]);

this.logger.info(
this.standardLogFn({
...transaction,
dispatcherConnectionState: this.dispatcherConnectionState
})("Retried event publish")
);
}

private async registrationAttempt() {
Expand Down Expand Up @@ -515,7 +529,8 @@ export class Incomer <
...finalEvent, redisMetadata: {
...finalEvent.redisMetadata,
eventTransactionId: finalEvent.redisMetadata.transactionId
}
},
dispatcherConnectionState: this.dispatcherConnectionState
})("Event Stored but not published"));

return;
Expand All @@ -527,7 +542,8 @@ export class Incomer <
...finalEvent, redisMetadata: {
...finalEvent.redisMetadata,
eventTransactionId: finalEvent.redisMetadata.transactionId
}
},
dispatcherConnectionState: this.dispatcherConnectionState
})("Published event"));
}

Expand Down Expand Up @@ -580,7 +596,12 @@ export class Incomer <
try {
match<DispatcherChannelEvents>({ name })
.with({ name: "APPROVEMENT" }, async() => {
this.logger.info(logData, "New approvement message on Dispatcher Channel");
this.logger.info(
this.standardLogFn({
...logData,
dispatcherConnectionState: this.dispatcherConnectionState
} as any)("New approvement message on Dispatcher Channel")
);

await this.handleApprovement(message as DispatcherApprovementMessage);
})
Expand Down Expand Up @@ -635,7 +656,9 @@ export class Incomer <
}
});

this.logger.debug(this.standardLogFn(logData as any)("Resolved Ping event"));
this.logger.debug(
this.standardLogFn({ ...logData, dispatcherConnectionState: this.dispatcherConnectionState } as any)("Resolved Ping event")
);
}

private async customEvent(
Expand All @@ -647,7 +670,8 @@ export class Incomer <

const logData = {
channel,
...message
...message,
dispatcherConnectionState: this.dispatcherConnectionState
};

if (this.eventsValidationFn) {
Expand Down Expand Up @@ -688,7 +712,11 @@ export class Incomer <
}
} as Transaction<"incomer">);

this.logger.info(this.standardLogFn(logData)("Resolved Custom event"));
this.logger.info(this.standardLogFn({
...logData,
dispatcherConnectionState: this.dispatcherConnectionState
})("Resolved Custom event")
);

return;
}
Expand All @@ -713,7 +741,10 @@ export class Incomer <
});

this.logger.info(
this.standardLogFn(logData)(`Callback Resolved but retry for the given reason: ${unresolvedCallbackResult.reason}`)
this.standardLogFn({
...logData,
dispatcherConnectionState: this.dispatcherConnectionState
} as any)(`Callback Resolved but retry for the given reason: ${unresolvedCallbackResult.reason}`)
);

return;
Expand All @@ -728,7 +759,10 @@ export class Incomer <
} as Transaction<"incomer">);

this.logger.info(
this.standardLogFn(logData)(`Callback Resolved but failed for the given reason: ${unresolvedCallbackResult.reason}`)
this.standardLogFn({
...logData,
dispatcherConnectionState: this.dispatcherConnectionState
})(`Callback Resolved but failed for the given reason: ${unresolvedCallbackResult.reason}`)
);

return;
Expand All @@ -744,7 +778,10 @@ export class Incomer <
}
} as Transaction<"incomer">);

this.logger.info(this.standardLogFn(logData)(`Callback error reason: ${String(callbackResult.val)}`));
this.logger.info(this.standardLogFn({
...logData,
dispatcherConnectionState: this.dispatcherConnectionState
})(`Callback error reason: ${String(callbackResult.val)}`));
}

private async handleApprovement(
Expand Down
12 changes: 8 additions & 4 deletions src/utils/defaultStandardLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ export type StandardLogOpts<T extends GenericEvent = GenericEvent> = T & {
origin?: string;
to?: string;
eventTransactionId?: string;
}
},
dispatcherConnectionState?: boolean;
}

export function defaultStandardLog<
T extends GenericEvent = EventOptions<keyof Events>
>(event: T & { redisMetadata: { transactionId: string; origin?: string; to?: string, eventTransactionId?: string } }) {
>(event: StandardLogOpts<T>) {
const logs = Array.from(mapped<T>(event)).join("|");


const state = typeof event.dispatcherConnectionState === "undefined" ? "none" : String(event.dispatcherConnectionState);

// eslint-disable-next-line max-len
const eventMeta = `name:${logValueFallback(event.name)}|ope:${logValueFallback(event.operation)}|from:${logValueFallback(event.redisMetadata.origin)}|to:${logValueFallback(event.redisMetadata.to)}`;
const eventMeta = `name:${logValueFallback(event.name)}|ope:${logValueFallback(event.operation)}|from:${logValueFallback(event.redisMetadata.origin)}|to:${logValueFallback(event.redisMetadata.to)}|state:${state}`;

function log(message: string) {
return `(${logs})(${eventMeta}) ${message}`;
Expand All @@ -46,7 +50,7 @@ function logValueFallback(value: string): string {

function* mapped<
T extends GenericEvent = EventOptions<keyof Events>
>(event: T & { redisMetadata: { transactionId: string } }) {
>(event: StandardLogOpts<T>) {
for (const [key, formattedKey] of Object.entries(kScopeKeys)) {
if (key === "transactionId") {
yield `${formattedKey}:${logValueFallback(event.redisMetadata[key])}`;
Expand Down
Loading

0 comments on commit a5dbdc4

Please sign in to comment.