Skip to content

Commit

Permalink
refactor(./class/stream/~): use existing instance (#170)
Browse files Browse the repository at this point in the history
* refactor(./class/stream/~): use existing instance

* doc(./stream/~): updated doc

* test(./test/class/stream/~): updated UT
  • Loading branch information
Rossb0b authored Jan 31, 2025
1 parent 40014f3 commit 9e55b28
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 88 deletions.
3 changes: 2 additions & 1 deletion docs/pubsub/Channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export type PublishOptions<
## 📚 Usage

```ts
import { Channel } from "@myunisoft/redis";
import { Channel, RedisAdapter } from "@myunisoft/redis";

const name = "foo";
const redis = new RedisAdapter({
Expand All @@ -40,6 +40,7 @@ const subscriber = new RedisAdapter({
});

await redis.initialize();
await subscriber.initialize();

await subscriber.subscribe(name);
subscriber.on("message", (channel, message) => {
Expand Down
16 changes: 11 additions & 5 deletions docs/stream/Interpersonal.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface ClaimOptions {
idleTime: number;
}

interface GroupConsumerOptions extends BasementOptions {
export interface InterpersonalOptions extends StreamOptions {
groupName: string;
consumerName: string;
claimOptions?: ClaimOptions;
Expand All @@ -26,10 +26,17 @@ interface GroupConsumerOptions extends BasementOptions {
## 📚 Usage

```ts
import { GroupConsumer } from "@myunisoft/redis";
import { Interpersonal, RedisAdapter } from "@myunisoft/redis";

const consumer = new GroupConsumer({
connection,
const redis = new RedisAdapter({
port: Number(process.env.REDIS_PORT),
host: process.env.REDIS_HOST
});

await redis.initialize();

const consumer = new Interpersonal({
redis,
streamName: "my-stream-name",
groupName: "my-group-name",
consumerName: "my-consumer-name",
Expand All @@ -41,7 +48,6 @@ const consumer = new GroupConsumer({
}
});

await consumer.initialize();
await consumer.init();

const readable = Readable.from(firstConsumer[Symbol.asyncIterator]());
Expand Down
12 changes: 9 additions & 3 deletions docs/stream/Intrapersonal.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@
## 📚 Usage

```ts
import { Intrapersonal } from "@myunisoft/redis";
import { Intrapersonal, RedisAdapter } from "@myunisoft/redis";

const redis = new RedisAdapter({
port: Number(process.env.REDIS_PORT),
host: process.env.REDIS_HOST
});

await redis.initialize();

const consumer = new Intrapersonal({
connection,
redis,
streamName: "my-stream-name",
frequency: 10000,
lastId: "0-0",
count: 10
});

await consumer.initialize();
await consumer.init();

const readable = Readable.from(basicStream[Symbol.asyncIterator]());
Expand Down
15 changes: 11 additions & 4 deletions docs/stream/Stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,24 @@ interface ConsumeOptions {
## 📚 Usage

```ts
import { Stream } from "@myunisoft/redis";
import { Stream, RedisAdapter } from "@myunisoft/redis";

const redisStream = new Stream({
const redis = new RedisAdapter({
port: Number(process.env.REDIS_PORT),
host: process.env.REDIS_HOST
});

await redis.initialize();

const stream = new Stream({
redis,
streamName: "my-stream-name",
frequency: 10000,
lastId: "0-0",
count: 10
});

await redisStream.initialize();
await redisStream.init();
await stream.init();
```

## 📜 API
Expand Down
26 changes: 11 additions & 15 deletions src/class/stream/Interpersonal.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export class Interpersonal extends Stream {
...args: RedisValue[]
];

streamResults = await this.xreadgroup(...optionsWithCount);
streamResults = await this.redis.xreadgroup(...optionsWithCount);
}
else {
const optionsWithoutCount = [...redisOptions] as [
Expand All @@ -113,7 +113,7 @@ export class Interpersonal extends Stream {
...args: RedisValue[]
];

streamResults = await this.xreadgroup(...optionsWithoutCount);
streamResults = await this.redis.xreadgroup(...optionsWithoutCount);
}

if (!streamResults) {
Expand Down Expand Up @@ -144,7 +144,7 @@ export class Interpersonal extends Stream {
count: number | string
];

streamResults = await this.xautoclaim(...optionsWithCount);
streamResults = await this.redis.xautoclaim(...optionsWithCount);
}
else {
const optionsWithoutCount = [...redisOptions] as [
Expand All @@ -155,29 +155,25 @@ export class Interpersonal extends Stream {
start: RedisKey | number
];

streamResults = await this.xautoclaim(...optionsWithoutCount);
streamResults = await this.redis.xautoclaim(...optionsWithoutCount);
}

const [cursor, entries] = streamResults;
const [__, entries] = streamResults;

if (entries.length === 0) {
return [];
}

if (cursor !== "0-0") {
this.emit("rest");
}

return await this.handleEntries(entries, ">");
}

public async claimEntry(entryId: string): Promise<void> {
await this.xack(this.streamName, this.groupName, entryId);
await this.redis.xack(this.streamName, this.groupName, entryId);
await this.delEntry(entryId);
}

override async getConsumerData(): Promise<utils.XINFOConsumerData | undefined> {
const consumers = await this.xinfo("CONSUMERS", this.streamName, this.groupName);
const consumers = await this.redis.xinfo("CONSUMERS", this.streamName, this.groupName);

const formattedConsumers = utils.parseXINFOConsumers(consumers as utils.XINFOConsumers);

Expand All @@ -196,7 +192,7 @@ export class Interpersonal extends Stream {
return;
}

await this.xgroup("CREATE", this.streamName, this.groupName, "$", "MKSTREAM");
await this.redis.xgroup("CREATE", this.streamName, this.groupName, "$", "MKSTREAM");
}

override async deleteGroup() {
Expand All @@ -205,7 +201,7 @@ export class Interpersonal extends Stream {
return;
}

await this.xgroup("DESTROY", this.streamName, this.groupName);
await this.redis.xgroup("DESTROY", this.streamName, this.groupName);
}

override async consumerExist(): Promise<boolean> {
Expand All @@ -220,7 +216,7 @@ export class Interpersonal extends Stream {
return;
}

await this.xgroup("CREATECONSUMER", this.streamName, this.groupName, this.consumerName);
await this.redis.xgroup("CREATECONSUMER", this.streamName, this.groupName, this.consumerName);
}

override async deleteConsumer(): Promise<void> {
Expand All @@ -229,6 +225,6 @@ export class Interpersonal extends Stream {
return;
}

await this.xgroup("DELCONSUMER", this.streamName, this.groupName, this.consumerName);
await this.redis.xgroup("DELCONSUMER", this.streamName, this.groupName, this.consumerName);
}
}
6 changes: 3 additions & 3 deletions src/class/stream/Intrapersonal.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class Intrapersonal extends Stream {
...args: RedisValue[]
];

streamResults = await this.xread(...optionsWithCount);
streamResults = await this.redis.xread(...optionsWithCount);
}
else if (block) {
const optionsWithBlock = [...redisOptions] as [
Expand All @@ -48,15 +48,15 @@ export class Intrapersonal extends Stream {
...args: RedisValue[]
];

streamResults = await this.xread(...optionsWithBlock);
streamResults = await this.redis.xread(...optionsWithBlock);
}
else {
const optionsWithoutCount = [...redisOptions] as [
streamsToken: "STREAMS",
...args: RedisValue[]
];

streamResults = await this.xread(...optionsWithoutCount);
streamResults = await this.redis.xread(...optionsWithoutCount);
}

if (!streamResults) {
Expand Down
Loading

0 comments on commit 9e55b28

Please sign in to comment.