You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardexpand all lines: arrow-libs/fx/arrow-fx-coroutines/README.MD
+89-10
Original file line number
Diff line number
Diff line change
@@ -6,7 +6,7 @@ It's a functional implementation of Kotlin's coroutine system while providing su
6
6
Arrow Fx aims to be a battery included functional effects framework, below you can find a small overview of the library.
7
7
A full coverage of all data types and functions can be found [here](LINK DOCS).
8
8
9
-
###IO\<A\> vs suspend () -> A
9
+
## IO\<A\> vs suspend () -> A
10
10
11
11
Let's look at two simple concurrent examples that switch context and print the `Thread.currentThread().name`.
12
12
@@ -40,7 +40,7 @@ As you can see we can directly apply `evalOn` to our `suspend fun`, this elimina
40
40
Both programs are equivalent in semantics and guarantees, the later will however perform better since it eliminates the wrapping `IO` requires.
41
41
This is possible without losing cancellation support, as explained in the detail here [anchor cancellation].
42
42
43
-
###IO<Either<E, A>> vs suspend () -> Either<E, A>
43
+
## IO<Either<E, A>> vs suspend () -> Either<E, A>
44
44
45
45
When writing functional code style we often want to express our domain errors as clearly as possible, a popular pattern is to return `Either<DomainError, SuccessValue>`.
46
46
Let's assume following domain, and compare two snippets one using `IO<Either<E, A>>` and another `suspend () -> Either<E, A>`.
@@ -60,7 +60,7 @@ suspend fun User.process(): Either<PersistenceError, ProcessedUser> =
60
60
else Left(PersistenceError)
61
61
```
62
62
63
-
####IO<Either<E, A>>
63
+
### IO<Either<E, A>>
64
64
65
65
```kotlin:ank
66
66
fun ioProgram(): IO<Either<PersistenceError, ProcessedUser>> =
suspend fun either(): Either<PersistenceError, ProcessedUser> =
@@ -90,7 +90,7 @@ suspend fun either(): Either<PersistenceError, ProcessedUser> =
90
90
}
91
91
```
92
92
93
-
####suspend R.() -> A
93
+
### suspend R.() -> A
94
94
95
95
We can use extension functions to do functional dependency injection with similar semantics as `Reader` or `Kleisli`.
96
96
They allow us to elegantly define `syntax` for a certain type. Let's see a simple example.
@@ -147,7 +147,7 @@ tailrec suspend fun sleeper(): Unit {
147
147
}
148
148
```
149
149
150
-
####cancelBoundary()
150
+
### cancelBoundary()
151
151
152
152
Calling `suspend fun cancelBoundary()` will check for cancellation, and will gracefully exit in case the effect was cancelled. An example.
153
153
@@ -173,7 +173,7 @@ tailrec suspend fun repeat(n: Int): Unit {
173
173
}
174
174
```
175
175
176
-
####Parallel operations cancellation
176
+
### Parallel operations cancellation
177
177
178
178
All parallel `suspend` operators in Arrow Fx behave in the following way.
179
179
@@ -183,7 +183,7 @@ All parallel `suspend` operators in Arrow Fx behave in the following way.
183
183
184
184
For more documentation on parallel operations see below.
185
185
186
-
####Uncancellable
186
+
### Uncancellable
187
187
188
188
So how can you execute of `suspend fun` with guarantee that it cannot be cancelled. You simply `wrap` it in the `uncancelable` builder and the function will guarantee not to be cancelled. If the progam is already cancelled before, this block will not run and if it gets cancelled during the execution of this block it will exit immediately after.
189
189
@@ -315,13 +315,84 @@ suspend main(): Unit {
315
315
}
316
316
```
317
317
318
-
### Error Handling
318
+
## Concurrency Helpers
319
+
320
+
### Ref vs Atomic
321
+
322
+
`Ref` has been renamed `Atomic` in the new API; and, it provides the same level of service as `Ref`
323
+
(i.e. `Atomic` provides a safe concurrent API to access to a mutable reference).
324
+
325
+
For example
326
+
327
+
```kotlin
328
+
funfactorial(n:Int) =IO.fx {
329
+
val res =Ref(1L).bind()
330
+
331
+
(1 until n+1).parTraverse {
332
+
res.update(it::times)
333
+
}.bind()
334
+
res.get().bind()
335
+
}
336
+
```
337
+
338
+
becomes
339
+
340
+
```kotlin:ank
341
+
suspend fun factorial(n: Int) {
342
+
val res = Atomic(1L)
343
+
344
+
(1 until n+1).parTraverse {
345
+
res.update(it::times)
346
+
}
347
+
res.get()
348
+
}
349
+
```
350
+
351
+
This code snippet isn’t very useful, except to show how `Ref/Atomic` can be used with concurrent access.
352
+
353
+
### MVar vs ConcurrentVar
354
+
355
+
`MVar` is now called `ConcurrentVar`; and, it provides the same level of service as `MVar`
356
+
(i.e. a `ConcurrentVar` is a mutable concurrent safe variable which is either `empty` or contains a typed `single value`).
357
+
358
+
```kotlin:ank
359
+
tailrec
360
+
fun sum(state: MVar<ForIO, Int>, list: List<Int>) : IO<Int> =
suspend fun sum(state: ConcurrentVar<Int>, list: List<Int>) {
376
+
when {
377
+
list.isEmpty() -> state.take()
378
+
else -> {
379
+
val cur = state.take()
380
+
state.put(cur + list[0])
381
+
sum(state, list.tail())
382
+
}
383
+
}
384
+
}
385
+
```
386
+
387
+
This code snippet isn’t very useful, except to show how the atomic calls of `MVar/ConcurrentVar` can be used.
388
+
389
+
## Error Handling
319
390
320
391
In Kotlin with suspend `try/catch` can safely be used to recover from exceptions.
321
392
322
393
Simple constructs like `suspend fun Either.catch(f: () -> A): Either<Throwable, A>` are available for nicer syntax when and lifting errors in your domain.
323
394
324
-
###Retrying and repeating effects
395
+
## Retrying and repeating effects
325
396
326
397
`Schedule` allows you to define and compose powerful yet simple policies, which can be used to either repeat or retry computation.
327
398
@@ -444,6 +515,14 @@ suspend fun <A, B> raceN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend
444
515
)
445
516
```
446
517
518
+
## And more…
519
+
520
+
Arrow Fx Coroutines also provides, for complex/specific use cases, services such as :
521
+
- an immutable queue (`IQueue` datatype),
522
+
- a purely functional, effectful stream processing with the new `Stream`API,
523
+
- The new service `CircuitBreaker`aims to to make our distributed systems more reliable.
524
+
This list is not exhaustive.
525
+
447
526
## Arrow Fx Coroutines, KotlinX Coroutines & Kotlin Standard Library
0 commit comments