Skip to content

Commit a6a05c8

Browse files
nomisRevraulraja
andauthored
Improve Arrow Fx Coroutines docs for release (#248)
Co-authored-by: Raúl Raja Martínez <[email protected]>
1 parent 761ce00 commit a6a05c8

29 files changed

+802
-263
lines changed

arrow-libs/fx/arrow-docs/docs/fx/async/README.md

+231-147
Large diffs are not rendered by default.

arrow-libs/fx/arrow-fx-coroutines/README.MD

+9-7
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,11 @@ tailrec suspend fun sleeper(): Unit {
208208
This also means that our new sleep can back-pressure `timeOutOrNull`.
209209

210210
```kotlin:ank
211-
suspend main(): Unit {
211+
import arrow.fx.coroutines.*
212+
213+
suspend fun main(): Unit {
212214
val r = timeOutOrNull(1.seconds) {
213-
uncancelable { sleep(2.seconds) }
215+
uncancellable { sleep(2.seconds) }
214216
} // r is null, but took 2 seconds.
215217
}
216218
```
@@ -306,7 +308,7 @@ val resources: List<Resource<File>> =
306308
val resource: Resource<List<File>> =
307309
resources.sequence(Resource.applicative())
308310
309-
suspend main(): Unit {
311+
suspend fun main(): Unit {
310312
resource.use { files ->
311313
files.parTraverse(IODispatchers.IOPool) { file ->
312314
file.toString()
@@ -399,7 +401,7 @@ Simple constructs like `suspend fun Either.catch(f: () -> A): Either<Throwable,
399401
A simple example might be to repeat an action `n` times, similar to the `repeat` function in the standard library.
400402

401403
```kotlin:ank
402-
suspend main(): Unit {
404+
suspend fun main(): Unit {
403405
repeat(Schedule.recurs<A>(n)) {
404406
println("Hello")
405407
}
@@ -409,7 +411,7 @@ suspend main(): Unit {
409411
Alternatively we can re-use this `Schedule` to `retry` a `suspend fun` `n` times when it fails.
410412

411413
```kotlin:ank
412-
suspend main(): Unit {
414+
suspend fun main(): Unit {
413415
retry(Schedule.recurs<A>(n)) {
414416
println("I am going to do nothing but throw a tantrum!")
415417
throw RuntimeException("Boom!")
@@ -424,8 +426,8 @@ fun <A> schedule(): Schedule<A, List<A>> = Schedule {
424426
(recurs<A>(10) and spaced(10.seconds)) zipRight collect()
425427
}
426428
427-
suspend main(): Unit {
428-
var count = Atomic(0)
429+
suspend fun main(): Unit {
430+
val count = Atomic(0)
429431
430432
val history: List<Int> = repeat(schedule<Int>()) {
431433
println("Incrementing the ref")

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Bracket.kt

+143-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ sealed class ExitCase {
2121

2222
/**
2323
* Runs [f] in an uncancellable manner.
24+
* If [f] gets cancelled, it will back-pressure the cancelling operation until finished.
2425
*
2526
* ```kotlin:ank:playground
2627
* import arrow.fx.coroutines.*
@@ -56,6 +57,17 @@ suspend fun <A> uncancellable(f: suspend () -> A): A =
5657
}
5758
}
5859

60+
/**
61+
* Registers an [onCancel] handler after [fa].
62+
* [onCancel] is guaranteed to be called in case of cancellation, otherwise it's ignored.
63+
*
64+
* Useful for wiring cancellation tokens between fibers, building inter-op with other effect systems or testing.
65+
*
66+
* @param fa program that you want to register handler on
67+
* @param onCancel handler to run when [fa] gets cancelled.
68+
* @see guarantee for registering a handler that is guaranteed to always run.
69+
* @see guaranteeCase for registering a handler that executes for any [ExitCase].
70+
*/
5971
suspend fun <A> onCancel(
6072
fa: suspend () -> A,
6173
onCancel: suspend () -> Unit
@@ -66,22 +78,149 @@ suspend fun <A> onCancel(
6678
}
6779
}
6880

81+
/**
82+
* Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation.
83+
*
84+
* As best practice, it's not a good idea to release resources via [guarantee].
85+
* since [guarantee] doesn't properly model acquiring, using and releasing resources.
86+
* It only models scheduling of a finalizer after a given suspending program,
87+
* so you should prefer [Resource] or [bracket] which captures acquiring,
88+
* using and releasing into 3 separate steps to ensure resource safety.
89+
*
90+
* @param fa program that you want to register handler on
91+
* @param finalizer handler to run after [fa].
92+
* @see guaranteeCase for registering a handler that tracks the [ExitCase] of [fa].
93+
*/
6994
suspend fun <A> guarantee(
7095
fa: suspend () -> A,
71-
release: suspend () -> Unit
72-
): A = guaranteeCase(fa) { release.invoke() }
96+
finalizer: suspend () -> Unit
97+
): A = guaranteeCase(fa) { finalizer.invoke() }
7398

99+
/**
100+
* Guarantees execution of a given [finalizer] after [fa] regardless of success, error or cancellation., allowing
101+
* for differentiating between exit conditions with to the [ExitCase] argument of the finalizer.
102+
*
103+
* As best practice, it's not a good idea to release resources via [guaranteeCase].
104+
* since [guaranteeCase] doesn't properly model acquiring, using and releasing resources.
105+
* It only models scheduling of a finalizer after a given suspending program,
106+
* so you should prefer [Resource] or [bracketCase] which captures acquiring,
107+
* using and releasing into 3 separate steps to ensure resource safety.
108+
*
109+
* @param fa program that you want to register handler on
110+
* @param finalizer handler to run after [fa].
111+
* @see guarantee for registering a handler that ignores the [ExitCase] of [fa].
112+
*/
74113
suspend fun <A> guaranteeCase(
75114
fa: suspend () -> A,
76-
release: suspend (ExitCase) -> Unit
77-
): A = bracketCase({ Unit }, { fa.invoke() }, { _, ex -> release(ex) })
115+
finalizer: suspend (ExitCase) -> Unit
116+
): A = bracketCase({ Unit }, { fa.invoke() }, { _, ex -> finalizer(ex) })
78117

118+
/**
119+
* Meant for specifying tasks with safe resource acquisition and release in the face of errors and interruption.
120+
* It would be the equivalent of an async capable `try/catch/finally` statements in mainstream imperative languages for resource
121+
* acquisition and release.
122+
*
123+
* @param acquire the action to acquire the resource
124+
*
125+
* @param use is the action to consume the resource and produce a result.
126+
* Once the resulting suspend program terminates, either successfully, error or disposed,
127+
* the [release] function will run to clean up the resources.
128+
*
129+
* @param release is the action that's supposed to release the allocated resource after `use` is done, irregardless
130+
* of its exit condition.
131+
*
132+
* ```kotlin:ank:playground
133+
* import arrow.fx.coroutines.*
134+
*
135+
* class File(url: String) {
136+
* fun open(): File = this
137+
* fun close(): Unit {}
138+
* override fun toString(): String = "This file contains some interesting content!"
139+
* }
140+
*
141+
* suspend fun openFile(uri: String): File = File(uri).open()
142+
* suspend fun closeFile(file: File): Unit = file.close()
143+
* suspend fun fileToString(file: File): String = file.toString()
144+
*
145+
* suspend fun main(): Unit {
146+
* //sampleStart
147+
* val res = bracket(
148+
* acquire = { openFile("data.json") },
149+
* use = { file -> fileToString(file) },
150+
* release = { file: File -> closeFile(file) }
151+
* )
152+
* //sampleEnd
153+
* println(res)
154+
* }
155+
* ```
156+
*/
79157
suspend fun <A, B> bracket(
80158
acquire: suspend () -> A,
81159
use: suspend (A) -> B,
82160
release: suspend (A) -> Unit
83161
): B = bracketCase(acquire, use, { a, _ -> release(a) })
84162

163+
/**
164+
* A way to safely acquire a resource and release in the face of errors and cancellation.
165+
* It uses [ExitCase] to distinguish between different exit cases when releasing the acquired resource.
166+
*
167+
* [bracketCase] exists out of a three stages:
168+
* 1. acquisition
169+
* 2. consumption
170+
* 3. releasing
171+
*
172+
* 1. Resource acquisition is **NON CANCELLABLE**.
173+
* If resource acquisition fails, meaning no resource was actually successfully acquired then we short-circuit the effect.
174+
* Reason being, we cannot [release] what we did not `acquire` first. Same reason we cannot call [use].
175+
* If it is successful we pass the result to stage 2 [use].
176+
*
177+
* 2. Resource consumption is like any other `suspend` effect. The key difference here is that it's wired in such a way that
178+
* [release] **will always** be called either on [ExitCase.Cancelled], [ExitCase.Failure] or [ExitCase.Completed].
179+
* If it failed than the resulting [suspend] from [bracketCase] will be the error, otherwise the result of [use].
180+
*
181+
* 3. Resource releasing is **NON CANCELLABLE**, otherwise it could result in leaks.
182+
* In the case it throws the resulting [suspend] will be either the error or a composed error if one occurred in the [use] stage.
183+
*
184+
* @param acquire the action to acquire the resource
185+
*
186+
* @param use is the action to consume the resource and produce a result.
187+
* Once the resulting suspend program terminates, either successfully, error or disposed,
188+
* the [release] function will run to clean up the resources.
189+
*
190+
* @param release the allocated resource after [use] terminates.
191+
*
192+
* ```kotlin:ank:playground
193+
* import arrow.fx.coroutines.*
194+
*
195+
* class File(url: String) {
196+
* fun open(): File = this
197+
* fun close(): Unit {}
198+
* }
199+
*
200+
* suspend fun File.content(): String =
201+
* "This file contains some interesting content!"
202+
* suspend fun openFile(uri: String): File = File(uri).open()
203+
* suspend fun closeFile(file: File): Unit = file.close()
204+
*
205+
* suspend fun main(): Unit {
206+
* //sampleStart
207+
* val res = bracketCase(
208+
* acquire = { openFile("data.json") },
209+
* use = { file -> file.content() },
210+
* release = { file, exitCase ->
211+
* when (exitCase) {
212+
* is ExitCase.Completed -> println("File closed with $exitCase")
213+
* is ExitCase.Cancelled -> println("Program cancelled with $exitCase")
214+
* is ExitCase.Failure -> println("Program failed with $exitCase")
215+
* }
216+
* closeFile(file)
217+
* }
218+
* )
219+
* //sampleEnd
220+
* println(res)
221+
* }
222+
* ```
223+
*/
85224
suspend fun <A, B> bracketCase(
86225
acquire: suspend () -> A,
87226
use: suspend (A) -> B,

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ConcurrentVar.kt

+3-10
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import kotlinx.atomicfu.atomic
66

77
/**
88
* [ConcurrentVar] is a mutable concurrent safe variable which is either `empty` or contains a `single value` of type [A].
9-
*
9+
* It behaves the same as a single element [arrow.fx.coroutines.stream.concurrent.Queue].
1010
* When trying to [put] or [take], it'll suspend when it's respectively [isEmpty] or [isNotEmpty].
1111
*
1212
* There are also operators that return immediately, [tryTake] & [tryPut],
@@ -18,19 +18,12 @@ import kotlinx.atomicfu.atomic
1818
* ```kotlin:ank:playground
1919
* import arrow.fx.coroutines.*
2020
*
21-
* suspend fun fibonacci(n: Int, prev: Int = 0, next: Int = 1): Int =
22-
* when (n) {
23-
* 0 -> prev
24-
* 1 -> next
25-
* else -> fibonacci(n - 1, next, prev + next)
26-
* }
27-
*
2821
* suspend fun main(): Unit {
2922
* val mvar = ConcurrentVar.empty<Int>()
3023
*
3124
* ForkConnected {
32-
* val asyncFib = fibonacci(50)
33-
* mvar.put(asyncFib)
25+
* sleep(3.seconds)
26+
* mvar.put(5)
3427
* }
3528
*
3629
* val r = mvar.take() // suspend until Fork puts result in MVar

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Fiber.kt

+25-6
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,24 @@ internal fun <A> Fiber(promise: UnsafePromise<A>, conn: SuspendConnection): Fibe
3535

3636
/**
3737
* Launches a new suspendable cancellable coroutine within a [Fiber].
38-
* It does so by connecting the created [Fiber]'s cancellation to the parent.
39-
* If the parent gets cancelled, then this [Fiber] will also get cancelled.
38+
* It does so by connecting the created [Fiber]'s cancellation to the callers `suspend` scope.
39+
* If the caller of `ForkConnected` gets cancelled, then this [Fiber] will also get cancelled.
40+
*
41+
* ```kotlin:ank:playground
42+
* import arrow.fx.coroutines.*
43+
*
44+
* suspend fun main(): Unit {
45+
* val parent = ForkConnected {
46+
* ForkConnected { // cancellation connected to parent
47+
* onCancel({ never<Unit>() }) {
48+
* println("I got cancelled by my parent")
49+
* }
50+
* }
51+
* }
52+
* sleep(1.seconds)
53+
* parent.cancel()
54+
* }
55+
* ```
4056
*
4157
* You can [Fiber.join] or [Fiber.cancel] the computation.
4258
* Cancelling this [Fiber] **will not** cancel its parent.
@@ -53,6 +69,7 @@ suspend fun <A> ForkConnected(ctx: CoroutineContext = ComputationPool, f: suspen
5369
Fiber(promise, conn2)
5470
}
5571

72+
/** @see ForkConnected **/
5673
suspend fun <A> (suspend () -> A).forkConnected(ctx: CoroutineContext = ComputationPool): Fiber<A> =
5774
ForkConnected(ctx, this)
5875

@@ -107,6 +124,7 @@ suspend fun <A> ForkScoped(
107124
Fiber(promise, conn2)
108125
}
109126

127+
/** @see ForkScoped */
110128
suspend fun <A> (suspend () -> A).forkScoped(
111129
ctx: CoroutineContext = ComputationPool,
112130
interruptWhen: suspend () -> Unit
@@ -117,17 +135,18 @@ suspend fun <A> (suspend () -> A).forkScoped(
117135
* You can [Fiber.join] or [Fiber.cancel] the computation.
118136
*
119137
* **BEWARE** you immediately leak the [Fiber] when launching without connection control.
120-
* Use [ForkConnected] or safely launch the fiber as a [Resource] or using [Fiber].
138+
* Use [ForkConnected] or safely launch the fiber as a [Resource] or using [bracketCase].
121139
*
122140
* @see ForkConnected for a fork operation that wires cancellation to its parent in a safe way.
123141
*/
142+
suspend fun <A> ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> =
143+
f.forkAndForget(ctx)
144+
145+
/** @see ForkAndForget */
124146
suspend fun <A> (suspend () -> A).forkAndForget(ctx: CoroutineContext = ComputationPool): Fiber<A> {
125147
val promise = UnsafePromise<A>()
126148
// A new SuspendConnection, because its cancellation is now decoupled from our current one.
127149
val conn = SuspendConnection()
128150
startCoroutineCancellable(CancellableContinuation(ctx, conn, promise::complete))
129151
return Fiber(promise, conn)
130152
}
131-
132-
suspend fun <A> ForkAndForget(ctx: CoroutineContext = ComputationPool, f: suspend () -> A): Fiber<A> =
133-
f.forkAndForget(ctx)

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTraverse.kt

+17
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,23 @@ suspend fun <A, B> Iterable<A>.parTraverse(f: suspend (A) -> B): List<B> =
8484
* Traverse this [Iterable] and and run all mappers [f] on [CoroutineContext].
8585
* Cancelling this operation cancels all running tasks.
8686
*
87+
* ```kotlin:ank:playground
88+
* import arrow.fx.coroutines.*
89+
*
90+
* data class User(val id: Int)
91+
*
92+
* suspend fun main(): Unit {
93+
* //sampleStart
94+
* suspend fun getUserById(id: Int): User =
95+
* User(id)
96+
*
97+
* val res = listOf(1, 2, 3)
98+
* .parTraverse(ComputationPool, ::getUserById)
99+
* //sampleEnd
100+
* println(res)
101+
* }
102+
* ```
103+
*
87104
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
88105
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
89106
*

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt

+21
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,27 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
1313
* Parallel maps [fa], [fb] in parallel on [ComputationPool].
1414
* Cancelling this operation cancels both operations running in parallel.
1515
*
16+
* ```kotlin:ank:playground
17+
* import arrow.fx.coroutines.*
18+
*
19+
* suspend fun main(): Unit {
20+
* //sampleStart
21+
* val result = parMapN(
22+
* { "First one is on ${Thread.currentThread().name}" },
23+
* { "Second one is on ${Thread.currentThread().name}" }
24+
* ) { (a, b) ->
25+
* "$a\n$b"
26+
* }
27+
* //sampleEnd
28+
* println(result)
29+
* }
30+
* ```
31+
*
32+
* @param fa value to parallel map
33+
* @param fb value to parallel map
34+
* @param f function to map/combine value [A] and [B]
35+
* ```
36+
*
1637
* @see parMapN for the same function that can race on any [CoroutineContext].
1738
*/
1839
suspend fun <A, B, C> parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (Pair<A, B>) -> C): C =

0 commit comments

Comments
 (0)