Skip to content

Commit eeb195e

Browse files
raulrajanomisRevrachelcarmena
authored
Restricted & Suspended delimited scopes (#259)
* Makes DelimContScope#reset suspend * Add suspending tests to TestSuite * Move result to promise as well. Fixes suspension in shift before immediate return * Update MultiShotCont with new impl * Entry point api functions * ktLintFormat * Fix `suspended` in `FxLaws` doesn't actually suspend * Re-add eager versions, fix `RestrictSuspension` in `nullable` and add suspend version * Test that shows flow comprehension hanging * Adds Stream test that also hangs * Remove multi-shot test demos * Effect interface and computation builder * Rename Effect to computations * Disable ktlint in effect file * Correct suspension requires ControlThrowable * Upgrade ktlint to allow fun interfaces * Fix support for nested scopes, add tests for not leaking Arrow Fx & KotlinX Coroutines * Removes NestedDelimContScope, makes Multishot private until we have a compiler plugin solution for ordered binds. Prepares DelimitedScope to be implemented by all and SuspendingComputation to replace DelimContScope if it's possible to implement its shift on it * unfinished progress to share with Simon * Add docs, revert some changes for RestrictSuspension implementation. Keep shiftCPS internally * progress toward encoding effects with suspend an restricted delimited scopes * tests passing * remove runRestrictedSuspension * Fix ktlintMainSourceSetCheck issues * Fix ktlintTestSourceSetCheck issues * Fix ktlint issues for arrow-core-data * Adapted Fx laws, test failing with suspension * ktlint * fix for Fx laws * Remove BindSyntax leftover for nullable * Remove old destructuring syntax for bind from Either doc * Raquel suggestion Co-authored-by: Rachel M. Carmena <[email protected]> * Deprecate `validated` block in favor of `either` block. * Deprecate Validated fx * Either.toValidated/Nel + fix to Validated docs * ktlint fixes Co-authored-by: Simon Vergauwen <[email protected]> Co-authored-by: Rachel M. Carmena <[email protected]> Co-authored-by: Simon Vergauwen <[email protected]>
1 parent 81c3dc3 commit eeb195e

File tree

31 files changed

+688
-558
lines changed

31 files changed

+688
-558
lines changed

arrow-libs/core/arrow-continuations/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ dependencies {
1212
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$JUNIT_VINTAGE_VERSION"
1313
testCompileOnly "io.kotlintest:kotlintest-runner-junit5:$KOTLIN_TEST_VERSION", excludeArrow
1414
testImplementation project(":arrow-core-test")
15+
testImplementation "io.arrow-kt:arrow-fx-coroutines:$VERSION_NAME"
1516
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package arrow.continuations
2+
3+
import arrow.continuations.generic.DelimitedScope
4+
5+
fun interface Effect<F> {
6+
fun control(): DelimitedScope<F>
7+
8+
companion object {
9+
suspend inline fun <Eff : Effect<*>, F, A> suspended(
10+
crossinline eff: (DelimitedScope<F>) -> Eff,
11+
crossinline just: (A) -> F,
12+
crossinline f: suspend Eff.() -> A,
13+
): F =
14+
Reset.suspended { just(f(eff(this))) }
15+
16+
inline fun <Eff : Effect<*>, F, A> restricted(
17+
crossinline eff: (DelimitedScope<F>) -> Eff,
18+
crossinline just: (A) -> F,
19+
crossinline f: suspend Eff.() -> A,
20+
): F =
21+
Reset.restricted { just(f(eff(this))) }
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package arrow.continuations
2+
3+
import arrow.continuations.generic.ShortCircuit
4+
import arrow.continuations.generic.ControlThrowable
5+
import arrow.continuations.generic.DelimContScope
6+
import arrow.continuations.generic.RestrictedScope
7+
import arrow.continuations.generic.SuspendMonadContinuation
8+
import arrow.continuations.generic.SuspendedScope
9+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
10+
11+
@PublishedApi
12+
internal object Reset {
13+
/**
14+
* Allows for building suspending single-shot computation blocks.
15+
* For short-circuiting, or shifting, a [ShortCircuit] [ControlThrowable] is used.
16+
* This ensures that any concurrent nested scopes are correctly closed.
17+
*
18+
* The usage of `try { ... } catch(e: Throwable) { ... }` will catch the [ShortCircuit] error,
19+
* and will lead to recover of short-circuiting.
20+
* You should always prefer to catch the most specific exception class, or
21+
* use `Either.catch`, `Validated.catch` etc or `e.nonFatalOrThrow()`
22+
* to ensure you're not catching `ShortCircuit`.
23+
*/
24+
suspend fun <A> suspended(block: suspend SuspendedScope<A>.() -> A): A =
25+
suspendCoroutineUninterceptedOrReturn { cont ->
26+
SuspendMonadContinuation(cont, block)
27+
.startCoroutineUninterceptedOrReturn()
28+
}
29+
30+
/**
31+
* Allows for building eager single-shot computation blocks.
32+
* For short-circuiting, or shifting, `@RestrictSuspension` state machine is used.
33+
* This doesn't allow nesting of computation blocks, or foreign suspension.
34+
*/
35+
// TODO This should be @RestrictSuspension but that breaks because a superclass is not considered to be correct scope
36+
fun <A> restricted(block: suspend RestrictedScope<A>.() -> A): A =
37+
DelimContScope(block).invoke()
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package arrow.continuations.generic
2+
3+
/**
4+
* A [Throwable] class intended for control flow.
5+
* Instance of [ControlThrowable] should **not** be caught,
6+
* and `arrow.core.NonFatal` does not catch this [Throwable].
7+
* Thus by extension `Either.catch` and `Validated.catch` also don't catch [ControlThrowable].
8+
*/
9+
open class ControlThrowable : Throwable() {
10+
override fun fillInStackTrace(): Throwable = this
11+
}

arrow-libs/core/arrow-continuations/src/main/kotlin/arrow/continuations/generic/DelimContScope.kt

+27-26
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package arrow.continuations.generic
22

3-
import kotlinx.atomicfu.atomic
4-
import kotlinx.atomicfu.loop
53
import kotlin.coroutines.Continuation
64
import kotlin.coroutines.EmptyCoroutineContext
75
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
86
import kotlin.coroutines.intrinsics.startCoroutineUninterceptedOrReturn
7+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
98
import kotlin.coroutines.resume
10-
import kotlin.coroutines.suspendCoroutine
119

1210
/**
1311
* Implements delimited continuations with with no multi shot support (apart from shiftCPS which trivially supports it).
@@ -20,17 +18,17 @@ import kotlin.coroutines.suspendCoroutine
2018
* continuation is appended to a list waiting to be invoked with the final result of the block.
2119
* When running a function we jump back and forth between the main function and every function inside shift via their continuations.
2220
*/
23-
class DelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedScope<R> {
21+
internal open class DelimContScope<R>(private val f: suspend RestrictedScope<R>.() -> R) : RestrictedScope<R> {
2422

2523
/**
2624
* Variable used for polling the result after suspension happened.
2725
*/
28-
private val resultVar = atomic<Any?>(EMPTY_VALUE)
26+
private var resultVar: Any? = EMPTY_VALUE
2927

3028
/**
3129
* Variable for the next shift block to (partially) run, if it is empty that usually means we are done.
3230
*/
33-
private val nextShift = atomic<(suspend () -> R)?>(null)
31+
private var nextShift: (suspend () -> R)? = null
3432

3533
/**
3634
* "Callbacks"/partially evaluated shift blocks which now wait for the final result
@@ -45,9 +43,10 @@ class DelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedSco
4543
private val continuation: Continuation<A>,
4644
private val shiftFnContinuations: MutableList<Continuation<R>>
4745
) : DelimitedContinuation<A, R> {
48-
override suspend fun invoke(a: A): R = suspendCoroutine { resumeShift ->
46+
override suspend fun invoke(a: A): R = suspendCoroutineUninterceptedOrReturn { resumeShift ->
4947
shiftFnContinuations.add(resumeShift)
5048
continuation.resume(a)
49+
COROUTINE_SUSPENDED
5150
}
5251
}
5352

@@ -63,43 +62,47 @@ class DelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedSco
6362
/**
6463
* Captures the continuation and set [f] with the continuation to be executed next by the runloop.
6564
*/
66-
override suspend fun <A> shift(f: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
67-
suspendCoroutine { continueMain ->
65+
override suspend fun <A> shift(f: suspend RestrictedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
66+
suspendCoroutineUninterceptedOrReturn { continueMain ->
6867
val delCont = SingleShotCont(continueMain, shiftFnContinuations)
69-
assert(nextShift.compareAndSet(null, suspend { this.f(delCont) }))
68+
assert(nextShift == null)
69+
nextShift = suspend { this.f(delCont) }
70+
COROUTINE_SUSPENDED
7071
}
7172

7273
/**
7374
* Same as [shift] except we never resume execution because we only continue in [c].
7475
*/
75-
override suspend fun <A, B> shiftCPS(f: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing =
76-
suspendCoroutine {
77-
assert(nextShift.compareAndSet(null, suspend { f(CPSCont(c)) }))
76+
suspend fun <A, B> shiftCPS(f: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing =
77+
suspendCoroutineUninterceptedOrReturn {
78+
assert(nextShift == null)
79+
nextShift = suspend { f(CPSCont(c)) }
80+
COROUTINE_SUSPENDED
7881
}
7982

8083
/**
8184
* Unsafe if [f] calls [shift] on this scope! Use [NestedDelimContScope] instead if this is a problem.
8285
*/
83-
override suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A =
86+
fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A =
8487
DelimContScope(f).invoke()
8588

8689
@Suppress("UNCHECKED_CAST")
8790
fun invoke(): R {
8891
f.startCoroutineUninterceptedOrReturn(this, Continuation(EmptyCoroutineContext) { result ->
89-
resultVar.value = result.getOrThrow()
92+
resultVar = result.getOrThrow()
9093
}).let {
9194
if (it == COROUTINE_SUSPENDED) {
9295
// we have a call to shift so we must start execution the blocks there
93-
resultVar.loop { mRes ->
94-
if (mRes === EMPTY_VALUE) {
95-
val nextShiftFn = nextShift.getAndSet(null)
96-
?: throw IllegalStateException("No further work to do but also no result!")
96+
while (true) {
97+
if (resultVar === EMPTY_VALUE) {
98+
val nextShiftFn = requireNotNull(nextShift) { "No further work to do but also no result!" }
99+
nextShift = null
97100
nextShiftFn.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
98-
resultVar.value = result.getOrThrow()
101+
resultVar = result.getOrThrow()
99102
}).let { nextRes ->
100103
// If we suspended here we can just continue to loop because we should now have a new function to run
101104
// If we did not suspend we short-circuited and are thus done with looping
102-
if (nextRes != COROUTINE_SUSPENDED) resultVar.value = nextRes as R
105+
if (nextRes != COROUTINE_SUSPENDED) resultVar = nextRes as R
103106
}
104107
// Break out of the infinite loop if we have a result
105108
} else return@let
@@ -108,17 +111,15 @@ class DelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedSco
108111
// we can return directly if we never suspended/called shift
109112
else return@invoke it as R
110113
}
111-
assert(resultVar.value !== EMPTY_VALUE)
114+
assert(resultVar !== EMPTY_VALUE)
112115
// We need to finish the partially evaluated shift blocks by passing them our result.
113116
// This will update the result via the continuations that now finish up
114-
for (c in shiftFnContinuations.asReversed()) c.resume(resultVar.value as R)
117+
for (c in shiftFnContinuations.asReversed()) c.resume(resultVar as R)
115118
// Return the final result
116-
return resultVar.value as R
119+
return resultVar as R
117120
}
118121

119122
companion object {
120-
fun <R> reset(f: suspend DelimitedScope<R>.() -> R): R = DelimContScope(f).invoke()
121-
122123
@Suppress("ClassName")
123124
private object EMPTY_VALUE
124125
}

arrow-libs/core/arrow-continuations/src/main/kotlin/arrow/continuations/generic/DelimitedCont.kt

+10-14
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,21 @@ interface DelimitedContinuation<A, R> {
1010
/**
1111
* Base interface for our scope.
1212
*/
13-
// TODO This should be @RestrictSuspension but that breaks because a superclass is not considered to be correct scope
14-
// @RestrictsSuspension
1513
interface DelimitedScope<R> {
16-
/**
17-
* Capture the continuation and pass it to [f].
18-
*/
19-
suspend fun <A> shift(f: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A
2014

2115
/**
22-
* Manually cps transformed shift. This can be used to gain multishot without hacks, but it's not the nicest for a few reasons:
23-
* - It does not inherit the scope, this means it will be hard to effects offering non-det to offer the same scope again...
24-
* - it is manually cps transformed which means every helper between this and invoking the continuation also needs to be transformed.
16+
* Exit the [DelimitedScope] with [R]
2517
*/
26-
suspend fun <A, B> shiftCPS(f: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing
18+
suspend fun <A> shift(r: R): A
19+
}
2720

21+
interface RestrictedScope<R> : DelimitedScope<R> {
2822
/**
29-
* Nest another scope inside the current one.
30-
*
31-
* It is important to use this over creating an unrelated scope because
23+
* Capture the continuation and pass it to [f].
3224
*/
33-
suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A
25+
suspend fun <A> shift(f: suspend RestrictedScope<R>.(DelimitedContinuation<A, R>) -> R): A
26+
27+
override suspend fun <A> shift(r: R): A = shift { r }
3428
}
29+
30+
interface SuspendedScope<R> : DelimitedScope<R>

arrow-libs/core/arrow-continuations/src/main/kotlin/arrow/continuations/generic/MultiShotDelimCont.kt

+22-15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import kotlin.coroutines.Continuation
66
import kotlin.coroutines.EmptyCoroutineContext
77
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
88
import kotlin.coroutines.intrinsics.startCoroutineUninterceptedOrReturn
9+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
910
import kotlin.coroutines.resume
1011
import kotlin.coroutines.suspendCoroutine
1112

@@ -22,10 +23,11 @@ import kotlin.coroutines.suspendCoroutine
2223
*
2324
* As per usual understanding of [DelimContScope] is required as I will only be commenting differences for now.
2425
*/
25-
open class MultiShotDelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedScope<R> {
26+
internal open class MultiShotDelimContScope<R>(val f: suspend RestrictedScope<R>.() -> R) : RestrictedScope<R> {
2627

28+
// TODO Since runs blocking these don't need to be atomic
2729
private val resultVar = atomic<R?>(null)
28-
private val nextShift = atomic<(suspend () -> R)?>(null)
30+
private val nextShift = atomic<(suspend RestrictedScope<R>.() -> R)?>(null)
2931

3032
// TODO This can be append only and needs fast reversed access
3133
private val shiftFnContinuations = mutableListOf<Continuation<R>>()
@@ -43,7 +45,7 @@ open class MultiShotDelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R)
4345
*/
4446
class MultiShotCont<A, R>(
4547
liveContinuation: Continuation<A>,
46-
private val f: suspend DelimitedScope<R>.() -> R,
48+
private val f: suspend RestrictedScope<R>.() -> R,
4749
private val stack: MutableList<Any?>,
4850
private val shiftFnContinuations: MutableList<Continuation<R>>
4951
) : DelimitedContinuation<A, R> {
@@ -71,19 +73,22 @@ open class MultiShotDelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R)
7173
override suspend fun invoke(a: A): R = DelimContScope<R> { runFunc(a) }.invoke()
7274
}
7375

74-
override suspend fun <A> shift(func: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
75-
suspendCoroutine { continueMain ->
76+
override suspend fun <A> shift(func: suspend RestrictedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
77+
suspendCoroutineUninterceptedOrReturn { continueMain ->
7678
val c = MultiShotCont(continueMain, f, stack, shiftFnContinuations)
77-
assert(nextShift.compareAndSet(null, suspend { this.func(c) }))
79+
val s: suspend RestrictedScope<R>.() -> R = { this.func(c) }
80+
assert(nextShift.compareAndSet(null, s))
81+
COROUTINE_SUSPENDED
7882
}
7983

80-
override suspend fun <A, B> shiftCPS(func: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing =
84+
suspend fun <A, B> shiftCPS(func: suspend DelimitedScope<R>.(DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing =
8185
suspendCoroutine {
82-
assert(nextShift.compareAndSet(null, suspend { func(CPSCont(c)) }))
86+
val s: suspend DelimitedScope<R>.() -> R = { func(CPSCont(c)) }
87+
assert(nextShift.compareAndSet(null, s))
8388
}
8489

8590
// This assumes RestrictSuspension or at least assumes the user to never reference the parent scope in f.
86-
override suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A =
91+
suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A =
8792
MultiShotDelimContScope(f).invoke()
8893

8994
fun invoke(): R {
@@ -95,7 +100,7 @@ open class MultiShotDelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R)
95100
if (mRes == null) {
96101
val nextShiftFn = nextShift.getAndSet(null)
97102
?: throw IllegalStateException("No further work to do but also no result!")
98-
nextShiftFn.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
103+
nextShiftFn.startCoroutineUninterceptedOrReturn(this, Continuation(EmptyCoroutineContext) { result ->
99104
resultVar.value = result.getOrThrow()
100105
}).let {
101106
if (it != COROUTINE_SUSPENDED) resultVar.value = it as R
@@ -110,19 +115,21 @@ open class MultiShotDelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R)
110115
}
111116

112117
companion object {
113-
fun <R> reset(f: suspend DelimitedScope<R>.() -> R): R = MultiShotDelimContScope(f).invoke()
118+
internal fun <R> reset(f: suspend RestrictedScope<R>.() -> R): R = MultiShotDelimContScope(f).invoke()
114119
}
115120
}
116121

117-
class PrefilledDelimContScope<R>(
122+
private class PrefilledDelimContScope<R>(
118123
override val stack: MutableList<Any?>,
119-
f: suspend DelimitedScope<R>.() -> R
124+
f: suspend RestrictedScope<R>.() -> R
120125
) : MultiShotDelimContScope<R>(f) {
121126
var depth = 0
122127

123128
// Here we first check if we still have values in our local stack and if so we use those first
124129
// if not we delegate to the normal delimited control implementation
125-
override suspend fun <A> shift(func: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
130+
override suspend fun <A> shift(func: suspend RestrictedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
126131
if (stack.size > depth) stack[depth++] as A
127-
else super.shift(func).also { depth++ }
132+
else
133+
@Suppress("ILLEGAL_RESTRICTED_SUSPENDING_FUNCTION_CALL")
134+
super.shift(func).also { depth++ }
128135
}

0 commit comments

Comments
 (0)