Skip to content

Commit 6f508f8

Browse files
nomisRevrachelcarmenaraulrajadanimontoyaaballano
authored
Add KotlinX interopt module for Arrow Fx Coroutines (#257)
Co-authored-by: Rachel M. Carmena <[email protected]> Co-authored-by: Raúl Raja Martínez <[email protected]> Co-authored-by: danieh <[email protected]> Co-authored-by: Alberto Ballano <[email protected]>
1 parent 00a886f commit 6f508f8

File tree

12 files changed

+593
-1
lines changed

12 files changed

+593
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
---
2+
layout: docs-fx
3+
title: kotlinx.coroutines
4+
permalink: /integrations/kotlinxcoroutines/
5+
---
6+
7+
# Kotlin Coroutines and runtime support
8+
9+
Kotlin offers a `suspend` system in the language, and it offers intrinsics in the standard library to build a library on top. These `intrinsic` functions allow you to `startCoroutine`s, `suspendCoroutine`s, build `CoroutineContext`s and so on.
10+
11+
Kotlin's language suspension support can be found in the [kotlin.coroutines](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/index.html) package.
12+
13+
There are currently two libraries that provide a runtime for the language's suspension system.
14+
15+
- [Arrow Fx](https://arrow-kt.io/docs/fx/)
16+
- [KotlinX Coroutines](https://github.com/Kotlin/kotlinx.coroutines)
17+
18+
They can easily interop with each other, and Arrow Fx's integration module offers certain combinators to use Arrow Fx's with KotlinX structured concurrency in frameworks that have chosen to incorporate the KotlinX Coroutines library such as Android and Ktor.
19+
20+
## Integrating Arrow Fx Coroutine with KotlinX Coroutine
21+
22+
If you'd like to introduce Arrow Fx Coroutine in your project, you might want to keep using the KotlinX Coroutines style of cancellation with `CoroutineScope`. This is especially useful on *Android* projects where the Architecture Components [already provide handy scopes for you](https://developer.android.com/topic/libraries/architecture/coroutines#lifecycle-aware).
23+
24+
### unsafeRunScoped & unsafeRunIO
25+
26+
`scope.unsafeRunScoped(f, cb)` runs the specific Arrow Fx Coroutine program with a [CoroutineScope](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html), so it will be automatically cancelled when the scope does as well.
27+
28+
Similarly, there's `f.unsafeRunIO(scope, cb)`, which works in the same way with different syntax:
29+
30+
```kotlin:ank:playground
31+
import arrow.fx.coroutines.*
32+
import arrow.fx.coroutines.kotlinx.*
33+
import kotlinx.coroutines.CoroutineScope
34+
import kotlinx.coroutines.SupervisorJob
35+
36+
val scope = CoroutineScope(SupervisorJob())
37+
38+
//sampleStart
39+
suspend fun sayHello(): Unit =
40+
println("Hello World")
41+
42+
suspend fun sayGoodBye(): Unit =
43+
println("Good bye World!")
44+
45+
suspend fun greet(): Unit {
46+
cancelBoundary()
47+
sayHello()
48+
cancelBoundary()
49+
sayGoodBye()
50+
}
51+
52+
fun main() {
53+
// This Arrow Fx Coroutine program would stop as soon as the scope is cancelled
54+
scope.unsafeRunScoped({ greet() }) { }
55+
56+
// alternatively, you could also do
57+
suspend { greet() }.unsafeRunIO(scope) { }
58+
}
59+
//sampleEnd
60+
```
61+
62+
63+
## Alternatively, integrating Arrow Fx Coroutines with kotlinx.coroutines
64+
65+
Sometimes you might not want to switch the runtime of your project, and slowly integrate to Arrow Fx Coroutines instead. For this use case, we've added some extensions to work with the KotlinX Coroutines runtime.
66+
67+
*IMPORTANT NOTE*: The way kotlinx.coroutines handle errors is by throwing exceptions after you run your operations. Because of this, it's important to clarify that your operation might crash your app if you're not handling errors or try-catching the execution.
68+
69+
### suspendCancellable
70+
71+
The `suspendCancellable` function will turn an Arrow Fx Coroutine program into a KotlinX Coroutine, allowing you to cancel it within its scope like any other KotlinX Coroutine.
72+
73+
```kotlin:ank:playground
74+
import arrow.fx.coroutines.*
75+
import arrow.fx.coroutines.kotlinx.*
76+
import kotlinx.coroutines.CoroutineScope
77+
import kotlinx.coroutines.launch
78+
import kotlinx.coroutines.SupervisorJob
79+
80+
val scope = CoroutineScope(SupervisorJob())
81+
82+
//sampleStart
83+
suspend fun sayHello(): Unit =
84+
println("Hello World")
85+
86+
suspend fun sayGoodBye(): Unit =
87+
println("Good bye World!")
88+
89+
suspend fun greet(): Unit {
90+
cancelBoundary()
91+
sayHello()
92+
cancelBoundary()
93+
sayGoodBye()
94+
}
95+
96+
fun main() {
97+
// This Arrow Fx Coroutine program would stop as soon as the scope is cancelled
98+
scope.launch {
99+
suspendCancellable { greet() }
100+
}
101+
}
102+
//sampleEnd
103+
```
104+
105+
# Handling errors
106+
107+
Let's briefly expand our previous example by adding a function that theoretically fetches (from network/db) the name of a person by their id:
108+
109+
```kotlin:ank
110+
suspend fun fetchNameOrThrow(id: Int): String =
111+
"fetched name for $id"
112+
113+
suspend fun sayHello(): Unit =
114+
println("Hello ${fetchNameOrThrow(userId)}")
115+
116+
suspend fun sayGoodBye(): Unit =
117+
println("Good bye ${fetchNameOrThrow(userId)}!")
118+
```
119+
120+
Because we're using a suspend function, we know that this operation will either give us the name or throw an exception, which could cause our app to crash.
121+
122+
But luckily, we're able to solve this for both combinators presented above using `Either.catch`:
123+
124+
```kotlin:ank:playground
125+
import arrow.core.*
126+
import arrow.fx.coroutines.*
127+
import arrow.fx.coroutines.kotlinx.*
128+
import kotlinx.coroutines.CoroutineScope
129+
import kotlinx.coroutines.launch
130+
import kotlinx.coroutines.SupervisorJob
131+
132+
val scope = CoroutineScope(SupervisorJob())
133+
134+
class NameNotFoundException(val id: Int): Exception("Name not found for id $id")
135+
val userId = 1
136+
137+
//sampleStart
138+
suspend fun fetchNameOrThrow(id: Int): String =
139+
throw NameNotFoundException(id)
140+
141+
suspend fun sayHello(): Unit =
142+
println("Hello ${fetchNameOrThrow(userId)}")
143+
144+
suspend fun sayGoodBye(): Unit =
145+
println("Good bye ${fetchNameOrThrow(userId)}!")
146+
147+
fun greet(): Unit = Either.catch {
148+
cancelBoundary()
149+
sayHello() // This first call will throw and the exception be captured within this IO.
150+
cancelBoundary()
151+
sayGoodBye() // The second op will not be executed because of the above.
152+
}.getOrElse { println("Error printing greeting") }
153+
154+
fun main() {
155+
156+
// You can safely run greet() with unsafeRunScoped
157+
scope.unsafeRunScoped({ greet() }) { }
158+
159+
// or suspendCancellable + kotlinx.
160+
suspendCancellable { greet() }
161+
}
162+
//sampleEnd
163+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
---
2+
layout: docs-fx
3+
title: "Kotlin Standard Library and Arrow Fx Coroutines"
4+
---
5+
6+
# Kotlin Standard Library & Arrow Fx Coroutines
7+
8+
## Demystify Coroutine
9+
10+
Kotlin's standard library defines a `Coroutine` as an instance of a suspendable computation.
11+
12+
In other words, a `Coroutine` is a compiled `suspend () -> A` program wired to a `Continuation`.
13+
14+
Which can be created by using [`kotlin.coroutines.intrinsics.createCoroutineUnintercepted`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.intrinsics/create-coroutine-unintercepted.html).
15+
16+
So let's take a quick look at an example.
17+
18+
```kotlin:ank
19+
import kotlin.coroutines.intrinsics.createCoroutineUnintercepted
20+
import kotlin.coroutines.Continuation
21+
import kotlin.coroutines.EmptyCoroutineContext
22+
import kotlin.coroutines.resume
23+
24+
suspend fun one(): Int = 1
25+
26+
val cont: Continuation<Unit> = ::one
27+
.createCoroutineUnintercepted(Continuation(EmptyCoroutineContext, ::println))
28+
29+
cont.resume(Unit)
30+
```
31+
32+
As you can see here above we create a `Coroutine` using `createCoroutineUnintercepted` which returns us `Continuation<Unit>`.
33+
Strange, you might've expected a `Coroutine` type but a `Coroutine` in the type system is represented by `Continuation<Unit>`.
34+
35+
This `typealias Coroutine = Contination<Unit>` will start running every time you call `resume(Unit)`, which allows you to run the suspend program as many times as you want.
36+
37+
## Kotlin Standard Library Coroutines
38+
39+
[Kotlin Std Coroutines](img/kotlin-stdlib.png)
40+
41+
The standard library offers a powerful set of primitives to build powerful applications on top of `Continuation`s,
42+
together with the compiler's ability to rewrite continuation based code to a beautiful `suspend` syntax.
43+
44+
They can be used to implement a very wide range use-cases, and or *not* bound to asynchronous -or concurrency use-cases.
45+
46+
- Arrow Core, offers computational DSLs build on top of Kotlin's Coroutines `either { }`, `validated { }`, etc
47+
- [`DeepRecursiveFunction`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin/-deep-recursive-function/) explained [here](https://medium.com/@elizarov/deep-recursion-with-coroutines-7c53e15993e3)
48+
- Another well-known async/concurrency implementation beside Arrow Fx Coroutines is [KotlinX Coroutines](https://github.com/Kotlin/kotlinx.coroutines).
49+
50+
The above image is not exhaustive list of the primitives you can find in the standard library.
51+
For an exhaustive list check the Kotlin Standard Library API docs:
52+
- [`kotlin.coroutines`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/)
53+
- [`kotlin.coroutines.intrinsics`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines.intrinsics/)
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
plugins {
2+
id "org.jetbrains.kotlin.jvm"
3+
id "org.jlleitschuh.gradle.ktlint"
4+
}
5+
6+
apply from: "$SUB_PROJECT"
7+
apply from: "$DOC_CREATION"
8+
9+
dependencies {
10+
implementation project(':arrow-fx-coroutines')
11+
implementation "org.jetbrains.kotlin:kotlin-stdlib:$KOTLIN_VERSION"
12+
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$KOTLINX_COROUTINES_VERSION"
13+
14+
testImplementation "io.kotest:kotest-runner-junit5-jvm:$KOTEST_VERSION"
15+
testImplementation "io.kotest:kotest-assertions-core-jvm:$KOTEST_VERSION"
16+
testImplementation "io.kotest:kotest-property-jvm:$KOTEST_VERSION"
17+
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$KOTLINX_COROUTINES_VERSION"
18+
}
19+
20+
21+
22+
compileTestKotlin {
23+
kotlinOptions {
24+
jvmTarget = "1.8"
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Maven publishing configuration
2+
POM_NAME=Arrow-Fx-Coroutines-KotlinX
3+
POM_ARTIFACT_ID=arrow-fx-coroutines-kotlinx
4+
POM_PACKAGING=jar
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package arrow.fx.coroutines.kotlinx
2+
3+
import arrow.fx.coroutines.CancelToken
4+
import arrow.fx.coroutines.CancellableContinuation
5+
import arrow.fx.coroutines.Fiber
6+
import arrow.fx.coroutines.never
7+
import arrow.fx.coroutines.startCoroutineCancellable
8+
import kotlinx.coroutines.CancellationException
9+
import kotlinx.coroutines.CompletableDeferred
10+
import kotlinx.coroutines.CoroutineScope
11+
import kotlinx.coroutines.Job
12+
import kotlinx.coroutines.newCoroutineContext
13+
import kotlinx.coroutines.suspendCancellableCoroutine
14+
import kotlin.coroutines.EmptyCoroutineContext
15+
16+
/**
17+
* Launches the source `suspend () -> A` composing cancellation with the `Structured Concurrency` of KotlinX.
18+
*
19+
* This will make sure that the source [f] is cancelled whenever it's [CoroutineScope] is cancelled.
20+
*/
21+
suspend fun <A> suspendCancellable(f: suspend () -> A): A =
22+
suspendCancellableCoroutine { cont ->
23+
if (cont.isActive) {
24+
val disposable = f.startCoroutineCancellable(CancellableContinuation(cont.context, cont::resumeWith))
25+
cont.invokeOnCancellation { disposable() }
26+
}
27+
}
28+
29+
/**
30+
* Unsafely run [fa] and receive the values in a callback [cb] while participating in structured concurrency.
31+
* Equivalent of [startCoroutineCancellable] but with its cancellation token wired to [CoroutineScope].
32+
*
33+
* @see [startCoroutineCancellable] for a version that returns the cancellation token instead.
34+
*/
35+
fun <A> CoroutineScope.unsafeRunScoped(fa: suspend () -> A, cb: (Result<A>) -> Unit): Unit =
36+
fa.unsafeRunScoped(this, cb)
37+
38+
/**
39+
* Unsafely run `this` and receive the values in a callback [cb] while participating in structured concurrency.
40+
* Equivalent of [startCoroutineCancellable] but with its cancellation token wired to [CoroutineScope].
41+
*
42+
* @see [startCoroutineCancellable] for a version that returns the cancellation token instead.
43+
*/
44+
fun <A> (suspend () -> A).unsafeRunScoped(
45+
scope: CoroutineScope,
46+
cb: (Result<A>) -> Unit
47+
): Unit {
48+
val newContext = scope.newCoroutineContext(EmptyCoroutineContext)
49+
val job = newContext[Job]
50+
51+
if (job == null || job.isActive) {
52+
val disposable = startCoroutineCancellable(CancellableContinuation(newContext, cb))
53+
54+
job?.invokeOnCompletion { e ->
55+
if (e is CancellationException) disposable.invoke()
56+
else Unit
57+
}
58+
}
59+
}
60+
61+
/**
62+
* Launches [f] as a coroutine in a [Fiber] while participating in structured concurrency.
63+
* This guarantees resource safety upon cancellation according to [CoroutineScope]'s lifecycle.
64+
*
65+
* The returned [Fiber] is automatically cancelled when [CoroutineScope] gets cancelled, or
66+
* whenever it's [Fiber.cancel] token is invoked. Whichever comes first.
67+
*/
68+
suspend fun <A> ForkScoped(scope: CoroutineScope, f: suspend () -> A): Fiber<A> {
69+
val newContext = scope.newCoroutineContext(EmptyCoroutineContext)
70+
val job = newContext[Job]
71+
72+
val promise = CompletableDeferred<Result<A>>(job)
73+
74+
return if (job == null || job.isActive) {
75+
val disposable = f.startCoroutineCancellable(CancellableContinuation(newContext) {
76+
promise.complete(it)
77+
})
78+
79+
job?.invokeOnCompletion { e ->
80+
if (e is CancellationException) disposable.invoke()
81+
else Unit
82+
}
83+
84+
Fiber({ promise.await().fold({ it }) { e -> throw e } }, CancelToken { disposable.invoke() })
85+
} else Fiber({ never<A>() }, CancelToken.unit)
86+
}

0 commit comments

Comments
 (0)