Skip to content

Commit e0ff421

Browse files
authored
Remove TerminalOps from Stream (#260)
1 parent 70e064f commit e0ff421

File tree

17 files changed

+249
-434
lines changed

17 files changed

+249
-434
lines changed

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt

-4
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ internal suspend fun <O> runInner(
7474
outputQ.enqueue1(Some(s))
7575
}
7676
.interruptWhen(done.map { it.isDefined() }) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enq last item while being interrupted
77-
.compile()
7877
.drain()
7978
}.swap().orNull()
8079
val e2 = lease.cancel().swap().orNull()
@@ -104,7 +103,6 @@ internal suspend fun <O> Stream<Stream<O>>.runOuter(
104103
runInner(ctx, inner, done, outputQ, running, available, outerScope)
105104
}
106105
}.interruptWhen(done.map { it.isDefined() })
107-
.compile()
108106
.drain()
109107
}
110108

@@ -165,7 +163,6 @@ internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable
165163
* }
166164
* }
167165
* .parJoin(10, IOPool)
168-
* .compile()
169166
* .drain()
170167
* //sampleEnd
171168
* ```
@@ -196,7 +193,6 @@ fun <O> Stream<Stream<O>>.parJoin(
196193
running.discrete() // Await everyone stop running
197194
.dropWhile { it > 0 }
198195
.take(1)
199-
.compile()
200196
.drain()
201197

202198
signalResult(done)

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Pull.kt

-13
Original file line numberDiff line numberDiff line change
@@ -590,19 +590,6 @@ fun <O> Pull<O, Unit>.firstOrNull(f: (O) -> Boolean): Pull<Nothing, PullUncons1<
590590
}
591591
}
592592

593-
/** Returns the last element of the input, if non-empty. */
594-
fun <O> Pull<O, Unit>.lastOrNull(): Pull<Nothing, O?> {
595-
fun go(prev: O?, s: Pull<O, Unit>): Pull<Nothing, O?> =
596-
s.unconsOrNull().flatMap { uncons ->
597-
when (uncons) {
598-
null -> Pull.just(prev)
599-
else -> go(uncons.head.lastOrNull() ?: prev, uncons.tail)
600-
}
601-
}
602-
603-
return go(null, this)
604-
}
605-
606593
/** Writes a single `true` value if all input matches the predicate, `false` otherwise. */
607594
fun <O> Pull<O, Unit>.forall(p: (O) -> Boolean): Pull<Nothing, Boolean> =
608595
unconsOrNull().flatMap { uncons ->

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceTerminalOps.kt arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceOps.kt

+47-4
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,55 @@ package arrow.fx.coroutines.stream
22

33
import arrow.fx.coroutines.Resource
44

5+
/** Opens DSL to consume [Stream] as a [Resource]. */
6+
fun <O> Stream<O>.asResource(): ResourceOps<O> =
7+
ResourceOps(this)
8+
59
/**
6-
* DSL boundary to access terminal operators as resources
10+
* DSL boundary to access terminal operators as a [Resource]
11+
* Allows for consume a [Stream] as a [Resource],
12+
* meaning the root scope of the [Stream] remains open until [Resource.use] returns.
13+
*
14+
* This allows for safe consumption of streaming resources in terminal operators,
15+
* and inside the [Resource.use] combinator.
16+
*
17+
* ```kotlin:ank:playground
18+
* import arrow.fx.coroutines.stream.*
19+
* import arrow.fx.coroutines.Atomic
20+
*
21+
* class Logger {
22+
* private val state = Atomic.unsafe(emptyList<String>())
23+
*
24+
* suspend fun log(msg: String): Unit =
25+
* state.update { it + msg }
26+
*
27+
* suspend fun dumpLog(): Unit =
28+
* println(state.get())
29+
* }
30+
*
31+
* fun openFileWithName(name: String): String =
32+
* "File($name)"
33+
*
34+
* //sampleStart
35+
* suspend fun main(): Unit {
36+
* val logger = Logger()
37+
*
38+
* Stream.bracket({ openFileWithName("a") }, { name -> logger.log("finalizing: $name") })
39+
* .append { Stream.bracket({ openFileWithName("b") }, { name -> logger.log("finalizing: $name") }) }
40+
* .asResource()
41+
* .lastOrError()
42+
* .use { last -> logger.log("Using $last") }
43+
*
44+
* logger.dumpLog() // [finalizing: File(a), Using File(b), finalizing: File(b)]
45+
* }
46+
* //sampleEnd
47+
* ```
748
*
8-
* Terminal operators consume the stream
9-
*/ // TODO report inline results in Exception in thread "main" java.lang.VerifyError: Bad type on operand stack
10-
/* inline */ class ResourceTerminalOps<O>(private val s: Stream<O>) {
49+
* As you can see here, we can `use` the `last` streamed `File` before it gets closed by the Stream.
50+
* Since we consumed the Stream as `asResource().lastOrError()`, this extends the last scope to the returned `Resource`,
51+
* so we can safely `use` it and the `Stream` still properly closes all resources opened with `bracket`.
52+
*/
53+
class ResourceOps<O>(private val s: Stream<O>) {
1154

1255
suspend fun toList(): Resource<List<O>> =
1356
compiler(mutableListOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } }

0 commit comments

Comments
 (0)