Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(class/eventManagement/dispatcher): avoid erroring on hot-reload #239

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

private logger: PartialLogger;
private incomerChannelHandler: IncomerChannelHandler<T>;
private channelsToUnsubscribe = new Set<string>();
private activeChannels = new Set<string>();

private pingInterval: number;
private pingIntervalTimer: NodeJS.Timer;
Expand Down Expand Up @@ -347,7 +347,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
await this.subscriber.unsubscribe(
this.dispatcherChannelName,
...this.incomerChannelHandler.channels.keys(),
...this.channelsToUnsubscribe.values()
...this.activeChannels.values()
);

this.updateState(false);
Expand Down Expand Up @@ -649,7 +649,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
toHandle.push(Promise.all([
this.incomerStore.deleteIncomer(inactive.providedUUID),
this.transactionHandler.resolveInactiveIncomerTransactions(inactive),
this.channelsToUnsubscribe.add(`${inactive.prefix ? `${inactive.prefix}-` : ""}${inactive.providedUUID}`)
this.activeChannels.add(`${inactive.prefix ? `${inactive.prefix}-` : ""}${inactive.providedUUID}`)
]));
}

Expand Down Expand Up @@ -853,36 +853,53 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
throw new Error("No related transaction found next to register event");
}

// Get Incomers Tree
const incomers = await this.incomerStore.getIncomers();
const now = Date.now();

// Avoid multiple init from a same instance of a incomer
for (const incomer of incomers) {
if (incomer.baseUUID === origin) {
await this.dispatcherTransactionStore.deleteTransaction(transactionId);
let providedUUID;
if (data.providedUUID) {
providedUUID = data.providedUUID;

const relatedIncomer = [...incomers.values()]
.find((incomer) => incomer.baseUUID === origin && incomer.providedUUID === providedUUID);

if (!relatedIncomer) {
const incomer = Object.assign({}, {
...data,
isDispatcherActiveInstance: origin === this.selfProvidedUUID,
baseUUID: origin,
lastActivity: now,
aliveSince: now,
prefix
});

throw new Error("Forbidden multiple registration for a same instance");
await this.incomerStore.setIncomer(incomer, data.providedUUID);
}
}
else {
for (const incomer of incomers) {
if (incomer.baseUUID === origin) {
await this.dispatcherTransactionStore.deleteTransaction(transactionId);

// Update the tree
const now = Date.now();
throw new Error("Forbidden multiple registration for a same instance");
}
}

const incomer = Object.assign({}, {
...data,
isDispatcherActiveInstance: origin === this.selfProvidedUUID,
baseUUID: origin,
lastActivity: now,
aliveSince: now,
prefix
});
const incomer = Object.assign({}, {
...data,
isDispatcherActiveInstance: origin === this.selfProvidedUUID,
baseUUID: origin,
lastActivity: now,
aliveSince: now,
prefix
});

const providedUUID = await this.incomerStore.setIncomer(incomer, data.providedUUID);
providedUUID = await this.incomerStore.setIncomer(incomer);
}

// Subscribe to the exclusive service channel
this.incomerChannelHandler.set({ uuid: providedUUID, prefix });

if (!this.channelsToUnsubscribe.has(`${prefix ? `${prefix}-` : ""}${providedUUID}`)) {
if (!this.activeChannels.has(`${prefix ? `${prefix}-` : ""}${providedUUID}`)) {
await this.subscriber.subscribe(`${prefix ? `${prefix}-` : ""}${providedUUID}`);
}

Expand All @@ -900,7 +917,6 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}
};

// Approve the service & send him info so he can use the dedicated channel
await this.eventsHandler.dispatch({
channel: this.dispatcherChannel as Channel<DispatcherChannelMessages["DispatcherMessages"]>,
store: this.dispatcherTransactionStore,
Expand Down
Loading