Skip to content

Commit be94fc0

Browse files
authored
Queue: support same behavior as Arrow Fx's IO Queue (#250)
* Rename circularBuffer to sliding, and adjust to Kotlin familiar semantics * Add Queue.dropping * Remove unused import QueueTest * Queue.dropping kdoc
1 parent f063409 commit be94fc0

File tree

2 files changed

+60
-12
lines changed
  • arrow-libs/fx/arrow-fx-coroutines/src
    • main/kotlin/arrow/fx/coroutines/stream/concurrent
    • test/kotlin/arrow/fx/coroutines/stream/concurrent

2 files changed

+60
-12
lines changed

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt

+21-8
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,18 @@ interface Queue<A> : Enqueue<A>, Dequeue1<A>, Dequeue<A> {
144144
fromStrategy(Strategy.boundedFifo(maxSize))
145145

146146
/** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */
147-
suspend fun <A> circularBuffer(maxSize: Int): Queue<A> =
148-
fromStrategy(Strategy.circularBuffer(maxSize))
147+
suspend fun <A> sliding(maxSize: Int): Queue<A> =
148+
fromStrategy(Strategy.sliding(maxSize))
149149

150-
fun <A> unsafeCircularBuffer(maxSize: Int): Queue<A> =
151-
fromStrategy(Strategy.circularBuffer(maxSize))
150+
fun <A> unsafeSliding(maxSize: Int): Queue<A> =
151+
fromStrategy(Strategy.sliding(maxSize))
152+
153+
/** Creates a queue which stores the first `maxSize` enqueued elements and which never blocks on enqueue. */
154+
suspend fun <A> dropping(maxSize: Int): Queue<A> =
155+
fromStrategy(Strategy.dropping(maxSize))
156+
157+
fun <A> unsafeDropping(maxSize: Int): Queue<A> =
158+
fromStrategy(Strategy.dropping(maxSize))
152159

153160
/** Created a bounded queue that distributed always at max `fairSize` elements to any subscriber. */
154161
suspend fun <A> fairBounded(maxSize: Int, fairSize: Int): Queue<A> =
@@ -205,11 +212,17 @@ internal object Strategy {
205212
fun <A> boundedLifo(maxSize: Int): PubSub.Strategy<A, Chunk<A>, IQueue<A>, Int> =
206213
PubSub.Strategy.bounded(maxSize, lifo()) { it.size }
207214

208-
/** Strategy for circular buffer, which stores the last `maxSize` enqueued elements and never blocks on enqueue. */
209-
fun <A> circularBuffer(maxSize: Int): PubSub.Strategy<A, Chunk<A>, IQueue<A>, Int> =
215+
/** Strategy for sliding, which stores the last `maxSize` enqueued elements and never blocks on enqueue. */
216+
fun <A> sliding(maxSize: Int): PubSub.Strategy<A, Chunk<A>, IQueue<A>, Int> =
217+
unbounded { q: IQueue<A>, a ->
218+
if (q.size <= maxSize) q.enqueue(a)
219+
else q.drop(1).enqueue(a)
220+
}
221+
222+
/** Strategy for dropping, which stores the first `maxSize` enqueued elements and never blocks on enqueue. */
223+
fun <A> dropping(maxSize: Int): PubSub.Strategy<A, Chunk<A>, IQueue<A>, Int> =
210224
unbounded { q: IQueue<A>, a ->
211-
if (q.size < maxSize) q.enqueue(a)
212-
else q.tail().enqueue(a)
225+
if (q.size <= maxSize) q.enqueue(a) else q
213226
}
214227

215228
/** Unbounded lifo strategy. */

arrow-libs/fx/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/concurrent/QueueTest.kt

+39-4
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,12 @@ class QueueTest : StreamSpec(spec = {
106106
}
107107
}
108108

109-
"circularBuffer" {
109+
"Queue.sliding - accepts maxSize elements while sliding over capacity" {
110110
checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, maxSize0 ->
111111
val maxSize = maxSize0 % 20 + 1
112112
val expected = s.compile().toList().takeLast(maxSize)
113113

114-
val q = Queue.circularBuffer<Option<Int>>(maxSize + 1)
114+
val q = Queue.sliding<Option<Int>>(maxSize)
115115

116116
s.noneTerminate()
117117
.effectMap { q.enqueue1(it) }
@@ -122,12 +122,12 @@ class QueueTest : StreamSpec(spec = {
122122
}
123123
}
124124

125-
"dequeueBatch circularBuffer" {
125+
"Queue.sliding - dequeueBatch" {
126126
checkAll(Arb.stream(Arb.int()), Arb.positiveInts(), Arb.positiveInts()) { s, maxSize0, batchSize0 ->
127127
val maxSize = maxSize0 % 20 + 1
128128
val batchSize = batchSize0 % 20 + 1
129129
val expected = s.compile().toList().takeLast(maxSize)
130-
val q = Queue.circularBuffer<Option<Int>>(maxSize + 1)
130+
val q = Queue.sliding<Option<Int>>(maxSize)
131131

132132
s.noneTerminate()
133133
.effectMap { q.enqueue1(it) }
@@ -141,6 +141,41 @@ class QueueTest : StreamSpec(spec = {
141141
}
142142
}
143143

144+
"Queue.dropping - accepts maxSize elements while dropping over capacity" {
145+
checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, maxSize0 ->
146+
val maxSize = maxSize0 % 20 + 1
147+
val expected = s.compile().toList().take(maxSize)
148+
149+
val q = Queue.dropping<Int>(maxSize)
150+
151+
s.effectMap { q.enqueue1(it) }
152+
.drain()
153+
.append {
154+
q.dequeue().take(expected.size)
155+
}
156+
.compile()
157+
.toList() shouldBe expected
158+
}
159+
}
160+
161+
"Queue.dropping - dequeueBatch" {
162+
checkAll(Arb.stream(Arb.int()), Arb.positiveInts(), Arb.positiveInts()) { s, maxSize0, batchSize0 ->
163+
val maxSize = maxSize0 % 20 + 1
164+
val batchSize = batchSize0 % 20 + 1
165+
val expected = s.compile().toList().take(maxSize)
166+
val q = Queue.dropping<Int>(maxSize)
167+
168+
s.effectMap { q.enqueue1(it) }
169+
.drain().append {
170+
Stream.constant(batchSize)
171+
.through(q.dequeueBatch())
172+
.take(expected.size)
173+
}
174+
.compile()
175+
.toList() shouldBe expected
176+
}
177+
}
178+
144179
"dequeue releases subscriber on " - {
145180
"interrupt" {
146181
val q = Queue.unbounded<Int>()

0 commit comments

Comments
 (0)