diff --git a/docs/pubsub/Channel.md b/docs/pubsub/Channel.md index b8cdd7e..4efecc0 100644 --- a/docs/pubsub/Channel.md +++ b/docs/pubsub/Channel.md @@ -27,7 +27,7 @@ export type PublishOptions< ## 📚 Usage ```ts -import { Channel } from "@myunisoft/redis"; +import { Channel, RedisAdapter } from "@myunisoft/redis"; const name = "foo"; const redis = new RedisAdapter({ @@ -40,6 +40,7 @@ const subscriber = new RedisAdapter({ }); await redis.initialize(); +await subscriber.initialize(); await subscriber.subscribe(name); subscriber.on("message", (channel, message) => { diff --git a/docs/stream/Interpersonal.md b/docs/stream/Interpersonal.md index f3fbd5a..dba1e8a 100644 --- a/docs/stream/Interpersonal.md +++ b/docs/stream/Interpersonal.md @@ -16,7 +16,7 @@ interface ClaimOptions { idleTime: number; } -interface GroupConsumerOptions extends BasementOptions { +export interface InterpersonalOptions extends StreamOptions { groupName: string; consumerName: string; claimOptions?: ClaimOptions; @@ -26,10 +26,17 @@ interface GroupConsumerOptions extends BasementOptions { ## 📚 Usage ```ts -import { GroupConsumer } from "@myunisoft/redis"; +import { Interpersonal, RedisAdapter } from "@myunisoft/redis"; -const consumer = new GroupConsumer({ - connection, +const redis = new RedisAdapter({ + port: Number(process.env.REDIS_PORT), + host: process.env.REDIS_HOST +}); + +await redis.initialize(); + +const consumer = new Interpersonal({ + redis, streamName: "my-stream-name", groupName: "my-group-name", consumerName: "my-consumer-name", @@ -41,7 +48,6 @@ const consumer = new GroupConsumer({ } }); -await consumer.initialize(); await consumer.init(); const readable = Readable.from(firstConsumer[Symbol.asyncIterator]()); diff --git a/docs/stream/Intrapersonal.md b/docs/stream/Intrapersonal.md index c0fff50..dc6a924 100644 --- a/docs/stream/Intrapersonal.md +++ b/docs/stream/Intrapersonal.md @@ -10,17 +10,23 @@ ## 📚 Usage ```ts -import { Intrapersonal } from "@myunisoft/redis"; +import { Intrapersonal, RedisAdapter } from "@myunisoft/redis"; + +const redis = new RedisAdapter({ + port: Number(process.env.REDIS_PORT), + host: process.env.REDIS_HOST +}); + +await redis.initialize(); const consumer = new Intrapersonal({ - connection, + redis, streamName: "my-stream-name", frequency: 10000, lastId: "0-0", count: 10 }); -await consumer.initialize(); await consumer.init(); const readable = Readable.from(basicStream[Symbol.asyncIterator]()); diff --git a/docs/stream/Stream.md b/docs/stream/Stream.md index 469545e..2dd0bc1 100644 --- a/docs/stream/Stream.md +++ b/docs/stream/Stream.md @@ -65,17 +65,24 @@ interface ConsumeOptions { ## 📚 Usage ```ts -import { Stream } from "@myunisoft/redis"; +import { Stream, RedisAdapter } from "@myunisoft/redis"; -const redisStream = new Stream({ +const redis = new RedisAdapter({ + port: Number(process.env.REDIS_PORT), + host: process.env.REDIS_HOST +}); + +await redis.initialize(); + +const stream = new Stream({ + redis, streamName: "my-stream-name", frequency: 10000, lastId: "0-0", count: 10 }); -await redisStream.initialize(); -await redisStream.init(); +await stream.init(); ``` ## 📜 API diff --git a/src/class/stream/Interpersonal.class.ts b/src/class/stream/Interpersonal.class.ts index 9e71bff..e8e6927 100644 --- a/src/class/stream/Interpersonal.class.ts +++ b/src/class/stream/Interpersonal.class.ts @@ -102,7 +102,7 @@ export class Interpersonal extends Stream { ...args: RedisValue[] ]; - streamResults = await this.xreadgroup(...optionsWithCount); + streamResults = await this.redis.xreadgroup(...optionsWithCount); } else { const optionsWithoutCount = [...redisOptions] as [ @@ -113,7 +113,7 @@ export class Interpersonal extends Stream { ...args: RedisValue[] ]; - streamResults = await this.xreadgroup(...optionsWithoutCount); + streamResults = await this.redis.xreadgroup(...optionsWithoutCount); } if (!streamResults) { @@ -144,7 +144,7 @@ export class Interpersonal extends Stream { count: number | string ]; - streamResults = await this.xautoclaim(...optionsWithCount); + streamResults = await this.redis.xautoclaim(...optionsWithCount); } else { const optionsWithoutCount = [...redisOptions] as [ @@ -155,29 +155,25 @@ export class Interpersonal extends Stream { start: RedisKey | number ]; - streamResults = await this.xautoclaim(...optionsWithoutCount); + streamResults = await this.redis.xautoclaim(...optionsWithoutCount); } - const [cursor, entries] = streamResults; + const [__, entries] = streamResults; if (entries.length === 0) { return []; } - if (cursor !== "0-0") { - this.emit("rest"); - } - return await this.handleEntries(entries, ">"); } public async claimEntry(entryId: string): Promise { - await this.xack(this.streamName, this.groupName, entryId); + await this.redis.xack(this.streamName, this.groupName, entryId); await this.delEntry(entryId); } override async getConsumerData(): Promise { - const consumers = await this.xinfo("CONSUMERS", this.streamName, this.groupName); + const consumers = await this.redis.xinfo("CONSUMERS", this.streamName, this.groupName); const formattedConsumers = utils.parseXINFOConsumers(consumers as utils.XINFOConsumers); @@ -196,7 +192,7 @@ export class Interpersonal extends Stream { return; } - await this.xgroup("CREATE", this.streamName, this.groupName, "$", "MKSTREAM"); + await this.redis.xgroup("CREATE", this.streamName, this.groupName, "$", "MKSTREAM"); } override async deleteGroup() { @@ -205,7 +201,7 @@ export class Interpersonal extends Stream { return; } - await this.xgroup("DESTROY", this.streamName, this.groupName); + await this.redis.xgroup("DESTROY", this.streamName, this.groupName); } override async consumerExist(): Promise { @@ -220,7 +216,7 @@ export class Interpersonal extends Stream { return; } - await this.xgroup("CREATECONSUMER", this.streamName, this.groupName, this.consumerName); + await this.redis.xgroup("CREATECONSUMER", this.streamName, this.groupName, this.consumerName); } override async deleteConsumer(): Promise { @@ -229,6 +225,6 @@ export class Interpersonal extends Stream { return; } - await this.xgroup("DELCONSUMER", this.streamName, this.groupName, this.consumerName); + await this.redis.xgroup("DELCONSUMER", this.streamName, this.groupName, this.consumerName); } } diff --git a/src/class/stream/Intrapersonal.class.ts b/src/class/stream/Intrapersonal.class.ts index 2744fb2..9c4a654 100644 --- a/src/class/stream/Intrapersonal.class.ts +++ b/src/class/stream/Intrapersonal.class.ts @@ -38,7 +38,7 @@ export class Intrapersonal extends Stream { ...args: RedisValue[] ]; - streamResults = await this.xread(...optionsWithCount); + streamResults = await this.redis.xread(...optionsWithCount); } else if (block) { const optionsWithBlock = [...redisOptions] as [ @@ -48,7 +48,7 @@ export class Intrapersonal extends Stream { ...args: RedisValue[] ]; - streamResults = await this.xread(...optionsWithBlock); + streamResults = await this.redis.xread(...optionsWithBlock); } else { const optionsWithoutCount = [...redisOptions] as [ @@ -56,7 +56,7 @@ export class Intrapersonal extends Stream { ...args: RedisValue[] ]; - streamResults = await this.xread(...optionsWithoutCount); + streamResults = await this.redis.xread(...optionsWithoutCount); } if (!streamResults) { diff --git a/src/class/stream/Stream.class.ts b/src/class/stream/Stream.class.ts index 3b678a4..61ea20f 100644 --- a/src/class/stream/Stream.class.ts +++ b/src/class/stream/Stream.class.ts @@ -9,7 +9,7 @@ import type { Entry, Group } from "../../types/index.js"; -import { RedisAdapter, RedisAdapterOptions } from "../adapter/redis.adapter.js"; +import { RedisAdapter } from "../adapter/redis.adapter.js"; // CONSTANTS const kDefaultRangeOptions = { min: "-", max: "+" }; @@ -52,7 +52,8 @@ export interface PushOptions { export type DelEntryResponse = Result; -export type StreamOptions = RedisAdapterOptions & { +export interface StreamOptions { + redis: RedisAdapter; streamName: string; /** * Interval of time between two iteration on the stream @@ -66,31 +67,31 @@ export type StreamOptions = RedisAdapterOptions & { * Number of entries it must pull at each iteration */ count?: number; -}; +} /** * * @description Shared method used to work on a Redis Stream */ -export class Stream extends RedisAdapter { +export class Stream { public streamName: string; public lastId: string; protected frequency: number; protected count?: number; + protected redis: RedisAdapter; constructor(options: StreamOptions) { - super({ ...options }); + const { lastId } = options; + + Object.assign(this, options); - this.streamName = options.streamName; - this.frequency = options.frequency; - this.count = options.count; - this.lastId = options.lastId ?? kMinId; + this.lastId = lastId ?? kMinId; } public async streamExist(): Promise { try { - await this.xinfo("STREAM", this.streamName); + await this.redis.xinfo("STREAM", this.streamName); return true; } @@ -108,14 +109,14 @@ export class Stream extends RedisAdapter { return; } - await this.xadd(this.streamName, "0-1", "init", "stream"); - await this.xdel(this.streamName, "0-1"); + await this.redis.xadd(this.streamName, "0-1", "init", "stream"); + await this.redis.xdel(this.streamName, "0-1"); } public async getInfo(): Promise { const formattedStreamData = {}; - const streamData = await this.xinfo("STREAM", this.streamName, "FULL", "COUNT", 0) as utils.XRedisData; + const streamData = await this.redis.xinfo("STREAM", this.streamName, "FULL", "COUNT", 0) as utils.XRedisData; for (const [key, value] of utils.parseData(streamData)) { formattedStreamData[key as string] = value; @@ -125,11 +126,11 @@ export class Stream extends RedisAdapter { } public async getLength(): Promise { - return await this.xlen(this.streamName); + return await this.redis.xlen(this.streamName); } public async getGroupsData(): Promise { - const groups = await this.xinfo("GROUPS", this.streamName) as utils.XINFOGroups; + const groups = await this.redis.xinfo("GROUPS", this.streamName) as utils.XINFOGroups; return utils.parseXINFOGroups(groups); } @@ -168,11 +169,11 @@ export class Stream extends RedisAdapter { const formattedId = id ?? "*"; - return await this.xadd(this.streamName, formattedId, ...entries) as string; + return await this.redis.xadd(this.streamName, formattedId, ...entries) as string; } public async delEntry(entryId: string): Promise { - const res = await this.xdel(this.streamName, entryId); + const res = await this.redis.xdel(this.streamName, entryId); if (res !== 1) { return Err(`Failed entry deletion for ${entryId}`); @@ -219,7 +220,7 @@ export class Stream extends RedisAdapter { const { min, max, count } = options; const redisOptions = utils.createRedisOptions(this.streamName, min, max, { count }) as unknown; - return utils.parseEntries(await this.xrange(...redisOptions as utils.FormattedRedisOptions)); + return utils.parseEntries(await this.redis.xrange(...redisOptions as utils.FormattedRedisOptions)); } /** @@ -255,7 +256,7 @@ export class Stream extends RedisAdapter { const { min, max, count } = options; const redisOptions = utils.createRedisOptions(this.streamName, max, min, { count }) as unknown; - return utils.parseEntries(await this.xrevrange(...redisOptions as utils.FormattedRedisOptions)); + return utils.parseEntries(await this.redis.xrevrange(...redisOptions as utils.FormattedRedisOptions)); } /** @@ -274,12 +275,12 @@ export class Stream extends RedisAdapter { */ public async trim(threshold: number | string): Promise { return typeof threshold === "number" ? - await this.xtrim( + await this.redis.xtrim( this.streamName, "MAXLEN", threshold ) : - await this.xtrim( + await this.redis.xtrim( this.streamName, "MINID", threshold @@ -310,7 +311,7 @@ export class Stream extends RedisAdapter { return; } - await this.xgroup("CREATE", this.streamName, name, "$", "MKSTREAM"); + await this.redis.xgroup("CREATE", this.streamName, name, "$", "MKSTREAM"); } /** @@ -325,7 +326,7 @@ export class Stream extends RedisAdapter { return; } - await this.xgroup("DESTROY", this.streamName, name); + await this.redis.xgroup("DESTROY", this.streamName, name); } /** @@ -336,7 +337,7 @@ export class Stream extends RedisAdapter { * @returns {Promise} */ public async getConsumerData(groupName: string, consumerName: string): Promise { - const consumers = await this.xinfo("CONSUMERS", this.streamName, groupName); + const consumers = await this.redis.xinfo("CONSUMERS", this.streamName, groupName); const formattedConsumers = utils.parseXINFOConsumers(consumers as utils.XINFOConsumers); @@ -369,7 +370,7 @@ export class Stream extends RedisAdapter { return; } - await this.xgroup("CREATECONSUMER", this.streamName, groupName, consumerName); + await this.redis.xgroup("CREATECONSUMER", this.streamName, groupName, consumerName); } /** @@ -385,6 +386,6 @@ export class Stream extends RedisAdapter { return; } - await this.xgroup("DELCONSUMER", this.streamName, groupName, consumerName); + await this.redis.xgroup("DELCONSUMER", this.streamName, groupName, consumerName); } } diff --git a/test/class/TimedKVPeer.spec.ts b/test/class/TimedKVPeer.spec.ts index 58e2cfa..57186cf 100644 --- a/test/class/TimedKVPeer.spec.ts +++ b/test/class/TimedKVPeer.spec.ts @@ -46,7 +46,7 @@ describe("TimedKVPeer", () => { }); it("Given an expired key, it should return null", async() => { - await timers.setTimeout(3_600); + await timers.setTimeout(5_000); assert.equal(await timedKVPeer.getValue("foo"), null); }); @@ -64,7 +64,7 @@ describe("TimedKVPeer", () => { it("Given a expired key", async() => { const result = await timedKVPeer.setValue({ key: "foo", value: { mail: "bar" } }); - await timers.setTimeout(3_600); + await timers.setTimeout(5_000); const deletedValues = await timedKVPeer.deleteValue(result.val as KeyType); diff --git a/test/class/stream/Interpersonal.spec.ts b/test/class/stream/Interpersonal.spec.ts index 3348eeb..080459b 100644 --- a/test/class/stream/Interpersonal.spec.ts +++ b/test/class/stream/Interpersonal.spec.ts @@ -6,7 +6,7 @@ import { Readable } from "node:stream"; import { once } from "node:events"; // Import Internal Dependencies -import { Interpersonal } from "../../../src/index"; +import { Interpersonal, RedisAdapter } from "../../../src/index"; import { randomValue } from "../../fixtures/utils/randomValue"; // Import Types @@ -43,6 +43,11 @@ const kDiff = 1000; const kCount = 2; describe("Interpersonal", () => { + const redis = new RedisAdapter({ + port: Number(process.env.REDIS_PORT), + host: process.env.REDIS_HOST + }); + let firstConsumer: Interpersonal; let secondConsumer: Interpersonal; let thirdConsumer: Interpersonal; @@ -53,7 +58,10 @@ describe("Interpersonal", () => { let thirdConsumerReadable: Readable; before(async() => { + await redis.initialize(); + firstConsumer = new Interpersonal({ + redis, streamName: kStreamName, claimOptions: { idleTime: 1000 @@ -62,12 +70,11 @@ describe("Interpersonal", () => { consumerName: kFirstConsumerName, lastId: kLastId, count: kCount, - frequency: kTimer, - port: Number(process.env.REDIS_PORT), - host: process.env.REDIS_HOST + frequency: kTimer }); secondConsumer = new Interpersonal({ + redis, streamName: kStreamName, claimOptions: { idleTime: 1000 @@ -76,26 +83,20 @@ describe("Interpersonal", () => { consumerName: kSecondConsumerName, lastId: kLastId, count: kCount, - frequency: kTimer, - port: Number(process.env.REDIS_PORT), - host: process.env.REDIS_HOST + frequency: kTimer }); thirdConsumer = new Interpersonal({ + redis, streamName: kStreamName, groupName: kGroupName, consumerName: kThirdConsumerName, lastId: kLastId, - frequency: kTimer + kDiff, - port: Number(process.env.REDIS_PORT), - host: process.env.REDIS_HOST + frequency: kTimer + kDiff }); - await firstConsumer.initialize(); await firstConsumer.init(); - await secondConsumer.initialize(); await secondConsumer.init(); - await thirdConsumer.initialize(); await thirdConsumer.init(); for (let index = 0; index < (kLength / 3); index++) { @@ -145,9 +146,7 @@ describe("Interpersonal", () => { const promises = [once(secondConsumerReadable, "close"), once(thirdConsumerReadable, "close")]; await Promise.all(promises); - await firstConsumer.close(true); - await secondConsumer.close(true); - await thirdConsumer.close(true); + await redis.close(true); }); test(`WHEN calling init() diff --git a/test/class/stream/Intrapersonal.spec.ts b/test/class/stream/Intrapersonal.spec.ts index 52a358b..3f955a7 100644 --- a/test/class/stream/Intrapersonal.spec.ts +++ b/test/class/stream/Intrapersonal.spec.ts @@ -6,7 +6,7 @@ import { once } from "node:events"; import { Readable } from "node:stream"; // Import Internal Dependencies -import { Intrapersonal } from "../../../src"; +import { Intrapersonal, RedisAdapter } from "../../../src"; import { randomValue } from "../../fixtures/utils/randomValue"; // Import Types @@ -30,20 +30,25 @@ const kCount = 3; const kFrequency = 300; describe("Intrapersonal", () => { + const redis = new RedisAdapter({ + port: Number(process.env.REDIS_PORT), + host: process.env.REDIS_HOST + }); + let intrapersonalStream: Intrapersonal; let readable: Readable; before(async() => { + await redis.initialize(); + intrapersonalStream = new Intrapersonal({ + redis, streamName: kStreamName, lastId: kLastId, count: kCount, - frequency: kFrequency, - port: Number(process.env.REDIS_PORT), - host: process.env.REDIS_HOST + frequency: kFrequency }); - await intrapersonalStream.initialize(); await intrapersonalStream.init(); assert.ok(await intrapersonalStream.streamExist()); @@ -67,7 +72,7 @@ describe("Intrapersonal", () => { readable.destroy(); await once(readable, "close"); - await intrapersonalStream.close(true); + await redis.close(true); }); test("reading data", async() => { diff --git a/test/class/stream/Stream.spec.ts b/test/class/stream/Stream.spec.ts index 3e09344..829a82b 100644 --- a/test/class/stream/Stream.spec.ts +++ b/test/class/stream/Stream.spec.ts @@ -3,7 +3,7 @@ import assert from "node:assert"; import { describe, before, after, test } from "node:test"; // Import Internal Dependencies -import { Stream } from "../../../src"; +import { RedisAdapter, Stream } from "../../../src"; import { randomValue } from "../../fixtures/utils/randomValue"; // CONSTANTS @@ -14,19 +14,23 @@ const kParseRegex = new RegExp("-([0-9])"); const kFrequency = 3000; describe("RedisStream instance", () => { + const redis = new RedisAdapter({ + port: Number(process.env.REDIS_PORT), + host: process.env.REDIS_HOST + }); + let stream: Stream; before(async() => { + await redis.initialize(); + stream = new Stream({ + redis, streamName: kStreamName, lastId: "0-0", - frequency: kFrequency, - port: Number(process.env.REDIS_PORT), - host: process.env.REDIS_HOST + frequency: kFrequency }); - await stream.initialize(); - const streamExist = await stream.streamExist(); if (!streamExist) { @@ -49,7 +53,7 @@ describe("RedisStream instance", () => { await stream.delEntry(entryId); } - await stream.close(true); + await redis.close(true); }); test("should instantiate with differents options in constructor", () => {