Skip to content

Commit 9ddcdcb

Browse files
authored
Suspend Either.fx implementation (#112)
* Suspend fx * Code review
1 parent a5e0e4d commit 9ddcdcb

File tree

4 files changed

+211
-8
lines changed

4 files changed

+211
-8
lines changed

arrow-libs/core/arrow-core-data/src/main/kotlin/arrow/core/Either.kt

+39-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import arrow.core.Either.Left
55
import arrow.core.Either.Right
66
import arrow.higherkind
77
import arrow.typeclasses.Show
8+
import kotlin.coroutines.Continuation
9+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
10+
import kotlin.coroutines.intrinsics.startCoroutineUninterceptedOrReturn
11+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
812

913
/**
1014
*
@@ -1048,17 +1052,17 @@ fun <A, B> EitherOf<A, B>.contains(elem: B): Boolean =
10481052
fix().fold({ false }, { it == elem })
10491053

10501054
fun <A, B, C> EitherOf<A, B>.ap(ff: EitherOf<A, (B) -> C>): Either<A, C> =
1051-
flatMap { a -> ff.fix().map { f -> f(a) } }.fix()
1055+
flatMap { a -> ff.fix().map { f -> f(a) } }
10521056

10531057
fun <A, B> EitherOf<A, B>.combineK(y: EitherOf<A, B>): Either<A, B> =
10541058
when (this) {
1055-
is Either.Left -> y.fix()
1059+
is Left -> y.fix()
10561060
else -> fix()
10571061
}
10581062

1059-
fun <A> A.left(): Either<A, Nothing> = Either.Left(this)
1063+
fun <A> A.left(): Either<A, Nothing> = Left(this)
10601064

1061-
fun <A> A.right(): Either<Nothing, A> = Either.Right(this)
1065+
fun <A> A.right(): Either<Nothing, A> = Right(this)
10621066

10631067
/**
10641068
* Returns [Either.Right] if the value of type B is not null, otherwise the specified A value wrapped into an
@@ -1071,8 +1075,8 @@ fun <A> A.right(): Either<Nothing, A> = Either.Right(this)
10711075
* ```
10721076
*/
10731077
fun <A, B> B?.rightIfNotNull(default: () -> A): Either<A, B> = when (this) {
1074-
null -> Either.Left(default())
1075-
else -> Either.Right(this)
1078+
null -> Left(default())
1079+
else -> Right(this)
10761080
}
10771081

10781082
/**
@@ -1093,7 +1097,34 @@ fun <A> Any?.rightIfNull(default: () -> A): Either<A, Nothing?> = when (this) {
10931097
fun <A, B> EitherOf<A, B>.handleErrorWith(f: (A) -> EitherOf<A, B>): Either<A, B> =
10941098
fix().let {
10951099
when (it) {
1096-
is Either.Left -> f(it.a).fix()
1097-
is Either.Right -> it
1100+
is Left -> f(it.a).fix()
1101+
is Right -> it
10981102
}
10991103
}
1104+
1105+
suspend fun <E, A> Either.Companion.fx(c: suspend EitherContinuation<E, A>.() -> A): Either<E, A> =
1106+
suspendCoroutineUninterceptedOrReturn sc@{ cont ->
1107+
val continuation = EitherContinuation(cont as Continuation<EitherOf<E, A>>)
1108+
val wrapReturn: suspend EitherContinuation<E, A>.() -> Either<E, A> = { c().right() }
1109+
1110+
// Returns either `Either<A, B>` or `COROUTINE_SUSPENDED`
1111+
val x: Any? = try {
1112+
wrapReturn.startCoroutineUninterceptedOrReturn(continuation, continuation)
1113+
} catch (e: Throwable) {
1114+
if (e is SuspendMonadContinuation.ShortCircuit) Left(e.e as E)
1115+
else throw e
1116+
}
1117+
1118+
return@sc if (x == COROUTINE_SUSPENDED) continuation.getResult()
1119+
else x as Either<E, A>
1120+
}
1121+
1122+
class EitherContinuation<E, A>(
1123+
parent: Continuation<EitherOf<E, A>>
1124+
) : SuspendMonadContinuation<EitherPartialOf<E>, A>(parent) {
1125+
override suspend fun <A> Kind<EitherPartialOf<E>, A>.bind(): A =
1126+
fix().fold({ e -> throw ShortCircuit(e) }, ::identity)
1127+
1128+
override fun ShortCircuit.recover(): Kind<EitherPartialOf<E>, A> =
1129+
Left(e as E)
1130+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package arrow.core
2+
3+
import arrow.Kind
4+
import arrow.typeclasses.suspended.BindSyntax
5+
import kotlinx.atomicfu.atomic
6+
import kotlinx.atomicfu.loop
7+
import kotlin.coroutines.Continuation
8+
import kotlin.coroutines.CoroutineContext
9+
import kotlin.coroutines.EmptyCoroutineContext
10+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
11+
import kotlin.coroutines.resumeWithException
12+
13+
internal const val UNDECIDED = 0
14+
internal const val SUSPENDED = 1
15+
16+
@Suppress("UNCHECKED_CAST")
17+
abstract class SuspendMonadContinuation<F, A>(private val parent: Continuation<Kind<F, A>>) : Continuation<Kind<F, A>>, BindSyntax<F> {
18+
19+
class ShortCircuit(val e: Any?) : RuntimeException(null, null) {
20+
override fun fillInStackTrace(): Throwable = this
21+
}
22+
23+
abstract fun ShortCircuit.recover(): Kind<F, A>
24+
25+
/**
26+
* State is either
27+
* 0 - UNDECIDED
28+
* 1 - SUSPENDED
29+
* Any? (3) `resumeWith` always stores it upon UNDECIDED, and `getResult` can atomically get it.
30+
*/
31+
private val _decision = atomic<Any>(UNDECIDED)
32+
33+
override val context: CoroutineContext = EmptyCoroutineContext
34+
35+
override fun resumeWith(result: Result<Kind<F, A>>) {
36+
_decision.loop { decision ->
37+
when (decision) {
38+
UNDECIDED -> {
39+
val r: Kind<F, A>? = when {
40+
result.isFailure -> {
41+
val e = result.exceptionOrNull()
42+
if (e is ShortCircuit) e.recover() else null
43+
}
44+
result.isSuccess -> result.getOrNull()
45+
else -> throw ArrowCoreInternalException
46+
}
47+
48+
when {
49+
r == null -> {
50+
parent.resumeWithException(result.exceptionOrNull()!!)
51+
return
52+
}
53+
_decision.compareAndSet(UNDECIDED, r) -> return
54+
else -> Unit // loop again
55+
}
56+
}
57+
else -> { // If not `UNDECIDED` then we need to pass result to `parent`
58+
val res: Result<Kind<F, A>> = result.fold({ Result.success(it) }, { t ->
59+
if (t is ShortCircuit) Result.success(t.recover())
60+
else Result.failure(t)
61+
})
62+
parent.resumeWith(res)
63+
return
64+
}
65+
}
66+
}
67+
}
68+
69+
@PublishedApi // return the result
70+
internal fun getResult(): Any? =
71+
_decision.loop { decision ->
72+
when (decision) {
73+
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return COROUTINE_SUSPENDED
74+
else -> return decision
75+
}
76+
}
77+
}

arrow-libs/core/arrow-core-data/src/main/kotlin/arrow/core/predef.kt

+7
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,10 @@ fun <A, B, Z> ((A, B) -> Z).curry(): (A) -> (B) -> Z = { p1: A -> { p2: B -> thi
77
infix fun <A, B, C> ((B) -> C).compose(f: (A) -> B): (A) -> C = { a: A -> this(f(a)) }
88

99
infix fun <A, B, C> ((A) -> B).andThen(g: (B) -> C): (A) -> C = { a: A -> g(this(a)) }
10+
11+
internal object ArrowCoreInternalException : RuntimeException(
12+
"Arrow-Core internal error. Please let us know and create a ticket at https://github.com/arrow-kt/arrow-core/issues/new/choose",
13+
null
14+
) {
15+
override fun fillInStackTrace(): Throwable = this
16+
}

arrow-libs/core/arrow-core-data/src/test/kotlin/arrow/core/EitherTest.kt

+88
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,16 @@ import arrow.core.test.laws.SemigroupKLaws
4141
import arrow.core.test.laws.ShowLaws
4242
import arrow.core.test.laws.TraverseLaws
4343
import arrow.typeclasses.Eq
44+
import io.kotlintest.fail
4445
import io.kotlintest.properties.Gen
4546
import io.kotlintest.properties.forAll
47+
import io.kotlintest.shouldBe
48+
import io.kotlintest.shouldThrow
49+
import java.util.concurrent.Executors
50+
import java.util.concurrent.ScheduledExecutorService
51+
import java.util.concurrent.TimeUnit
52+
import kotlin.coroutines.resume
53+
import kotlin.coroutines.suspendCoroutine
4654

4755
class EitherTest : UnitSpec() {
4856

@@ -203,5 +211,85 @@ class EitherTest : UnitSpec() {
203211
Right(a).handleErrorWith { Right(b) } == Right(a)
204212
}
205213
}
214+
215+
"suspended Either.fx can bind immediate values" {
216+
Gen.either(Gen.string(), Gen.int())
217+
.random()
218+
.take(1001)
219+
.forEach { either ->
220+
Either.fx<String, Int> {
221+
val res = !either
222+
res
223+
} shouldBe either
224+
}
225+
}
226+
227+
"suspended Either.fx can bind suspended values" {
228+
Gen.either(Gen.string(), Gen.int())
229+
.random()
230+
.take(10)
231+
.forEach { either ->
232+
Either.fx<String, Int> {
233+
val res = !(suspend {
234+
sleep(100)
235+
either
236+
}).invoke()
237+
238+
res
239+
} shouldBe either
240+
}
241+
}
242+
243+
"suspended Either.fx can safely handle immediate exceptions" {
244+
Gen.bind(Gen.int(), Gen.throwable(), ::Pair)
245+
.random()
246+
.take(1001)
247+
.forEach { (i, exception) ->
248+
shouldThrow<Throwable> {
249+
Either.fx<String, Int> {
250+
val res = !Either.Right(i)
251+
throw exception
252+
res
253+
}
254+
fail("It should never reach here. Either.fx should've thrown $exception")
255+
} shouldBe exception
256+
}
257+
}
258+
259+
"suspended Either.fx can bind suspended exceptions" {
260+
Gen.bind(Gen.int(), Gen.throwable(), ::Pair)
261+
.random()
262+
.take(10)
263+
.forEach { (i, exception) ->
264+
shouldThrow<Throwable> {
265+
Either.fx<String, Int> {
266+
val res = !Either.Right(i)
267+
sleep(100)
268+
throw exception
269+
res
270+
}
271+
fail("It should never reach here. Either.fx should've thrown $exception")
272+
} shouldBe exception
273+
}
274+
}
275+
}
276+
}
277+
278+
internal val scheduler: ScheduledExecutorService by lazy {
279+
Executors.newScheduledThreadPool(2) { r ->
280+
Thread(r).apply {
281+
name = "arrow-effect-scheduler-$id"
282+
isDaemon = true
283+
}
206284
}
207285
}
286+
287+
suspend fun sleep(duration: Long): Unit =
288+
if (duration <= 0) Unit
289+
else suspendCoroutine { cont ->
290+
scheduler.schedule(
291+
{ cont.resume(Unit) },
292+
duration,
293+
TimeUnit.MILLISECONDS
294+
)
295+
}

0 commit comments

Comments
 (0)