Skip to content

Commit da8623e

Browse files
b1ronvladvildanov
authored andcommitted
Add support for XREAD last entry (#3005)
* add support for XREAD last entry * handle reading from multiple streams * add test to ensure we block for empty stream * small tweak * add an option to XReadArgs instead * modify test comment * small preallocation optimization * Changed argument to generic ID, skip tests on Enterprise * Fix test case * Updated expiration command --------- Co-authored-by: Vladyslav Vildanov <[email protected]> Co-authored-by: vladvildanov <[email protected]>
1 parent 0b95fd7 commit da8623e

File tree

2 files changed

+82
-3
lines changed

2 files changed

+82
-3
lines changed

commands_test.go

+75-2
Original file line numberDiff line numberDiff line change
@@ -2588,13 +2588,14 @@ var _ = Describe("Commands", func() {
25882588
Expect(sadd.Err()).NotTo(HaveOccurred())
25892589
}
25902590

2591-
res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
2591+
expireAt := time.Now().Add(10 * time.Second)
2592+
res, err := client.HPExpireAt(ctx, "myhash", expireAt, "key1", "key200").Result()
25922593
Expect(err).NotTo(HaveOccurred())
25932594
Expect(res).To(Equal([]int64{1, -2}))
25942595

25952596
res, err = client.HPExpireTime(ctx, "myhash", "key1", "key2", "key200").Result()
25962597
Expect(err).NotTo(HaveOccurred())
2597-
Expect(res).To(BeEquivalentTo([]int64{time.Now().Add(10 * time.Second).UnixMilli(), -1, -2}))
2598+
Expect(res).To(BeEquivalentTo([]int64{expireAt.UnixMilli(), -1, -2}))
25982599
})
25992600

26002601
It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {
@@ -5888,6 +5889,78 @@ var _ = Describe("Commands", func() {
58885889
Expect(err).To(Equal(redis.Nil))
58895890
})
58905891

5892+
It("should XRead LastEntry", Label("NonRedisEnterprise"), func() {
5893+
res, err := client.XRead(ctx, &redis.XReadArgs{
5894+
Streams: []string{"stream"},
5895+
Count: 2, // we expect 1 message
5896+
ID: "+",
5897+
}).Result()
5898+
Expect(err).NotTo(HaveOccurred())
5899+
Expect(res).To(Equal([]redis.XStream{
5900+
{
5901+
Stream: "stream",
5902+
Messages: []redis.XMessage{
5903+
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
5904+
},
5905+
},
5906+
}))
5907+
})
5908+
5909+
It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() {
5910+
res, err := client.XRead(ctx, &redis.XReadArgs{
5911+
Streams: []string{"stream", "stream"},
5912+
ID: "+",
5913+
}).Result()
5914+
Expect(err).NotTo(HaveOccurred())
5915+
Expect(res).To(Equal([]redis.XStream{
5916+
{
5917+
Stream: "stream",
5918+
Messages: []redis.XMessage{
5919+
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
5920+
},
5921+
},
5922+
{
5923+
Stream: "stream",
5924+
Messages: []redis.XMessage{
5925+
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
5926+
},
5927+
},
5928+
}))
5929+
})
5930+
5931+
It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() {
5932+
start := time.Now()
5933+
go func() {
5934+
defer GinkgoRecover()
5935+
5936+
time.Sleep(100 * time.Millisecond)
5937+
id, err := client.XAdd(ctx, &redis.XAddArgs{
5938+
Stream: "empty",
5939+
ID: "4-0",
5940+
Values: map[string]interface{}{"quatro": "quatre"},
5941+
}).Result()
5942+
Expect(err).NotTo(HaveOccurred())
5943+
Expect(id).To(Equal("4-0"))
5944+
}()
5945+
5946+
res, err := client.XRead(ctx, &redis.XReadArgs{
5947+
Streams: []string{"empty"},
5948+
Block: 500 * time.Millisecond,
5949+
ID: "+",
5950+
}).Result()
5951+
Expect(err).NotTo(HaveOccurred())
5952+
// Ensure that the XRead call with LastEntry option blocked for at least 100ms.
5953+
Expect(time.Since(start)).To(BeNumerically(">=", 100*time.Millisecond))
5954+
Expect(res).To(Equal([]redis.XStream{
5955+
{
5956+
Stream: "empty",
5957+
Messages: []redis.XMessage{
5958+
{ID: "4-0", Values: map[string]interface{}{"quatro": "quatre"}},
5959+
},
5960+
},
5961+
}))
5962+
})
5963+
58915964
Describe("group", func() {
58925965
BeforeEach(func() {
58935966
err := client.XGroupCreate(ctx, "stream", "group", "0").Err()

stream_commands.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,11 @@ type XReadArgs struct {
137137
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
138138
Count int64
139139
Block time.Duration
140+
ID string
140141
}
141142

142143
func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
143-
args := make([]interface{}, 0, 6+len(a.Streams))
144+
args := make([]interface{}, 0, 2*len(a.Streams)+6)
144145
args = append(args, "xread")
145146

146147
keyPos := int8(1)
@@ -159,6 +160,11 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
159160
for _, s := range a.Streams {
160161
args = append(args, s)
161162
}
163+
if a.ID != "" {
164+
for range a.Streams {
165+
args = append(args, a.ID)
166+
}
167+
}
162168

163169
cmd := NewXStreamSliceCmd(ctx, args...)
164170
if a.Block >= 0 {

0 commit comments

Comments
 (0)