Skip to content

Commit 7cb613b

Browse files
1Jajen1raulrajanomisRevrachelcarmena
authored
Delimited continuations (#226)
Co-authored-by: Raul Raja <[email protected]> Co-authored-by: Simon Vergauwen <[email protected]> Co-authored-by: Rachel M. Carmena <[email protected]>
1 parent 3fac74e commit 7cb613b

File tree

9 files changed

+697
-2
lines changed

9 files changed

+697
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
plugins {
2+
id "org.jetbrains.kotlin.jvm"
3+
id "org.jetbrains.kotlin.kapt"
4+
id "org.jlleitschuh.gradle.ktlint"
5+
}
6+
7+
apply from: "$SUB_PROJECT"
8+
apply from: "$DOC_CREATION"
9+
apply plugin: 'kotlinx-atomicfu'
10+
11+
dependencies {
12+
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$KOTLIN_VERSION"
13+
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$JUNIT_VINTAGE_VERSION"
14+
testCompileOnly "io.kotlintest:kotlintest-runner-junit5:$KOTLIN_TEST_VERSION", excludeArrow
15+
testImplementation project(":arrow-core-test")
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Maven publishing configuration
2+
POM_NAME=Arrow Continuations
3+
POM_ARTIFACT_ID=arrow-continuations
4+
POM_PACKAGING=jar
5+
# Build configuration
6+
kapt.incremental.apt=false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package arrow.continuations.generic
2+
3+
import kotlinx.atomicfu.atomic
4+
import kotlinx.atomicfu.loop
5+
import kotlin.coroutines.Continuation
6+
import kotlin.coroutines.EmptyCoroutineContext
7+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
8+
import kotlin.coroutines.intrinsics.startCoroutineUninterceptedOrReturn
9+
import kotlin.coroutines.resume
10+
import kotlin.coroutines.suspendCoroutine
11+
12+
/**
13+
* Implements delimited continuations with with no multi shot support (apart from shiftCPS which trivially supports it).
14+
*
15+
* For a version that simulates multishot (albeit with drawbacks) see [MultiShotDelimContScope].
16+
* For a version that allows nesting [reset] and calling parent scopes inside inner scopes see [NestedDelimContScope].
17+
*
18+
* The basic concept here is appending callbacks and polling for a result.
19+
* Every shift is evaluated until it either finishes (short-circuit) or suspends (called continuation). When it suspends its
20+
* continuation is appended to a list waiting to be invoked with the final result of the block.
21+
* When running a function we jump back and forth between the main function and every function inside shift via their continuations.
22+
*/
23+
class DelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedScope<R> {
24+
25+
/**
26+
* Variable used for polling the result after suspension happened.
27+
*/
28+
private val resultVar = atomic<R?>(null)
29+
30+
/**
31+
* Variable for the next shift block to (partially) run, if it is empty that usually means we are done.
32+
*/
33+
private val nextShift = atomic<(suspend () -> R)?>(null)
34+
35+
/**
36+
* "Callbacks"/partially evaluated shift blocks which now wait for the final result
37+
*/
38+
// TODO This can be append only, but needs fast reversed access
39+
private val shiftFnContinuations = mutableListOf<Continuation<R>>()
40+
41+
/**
42+
* Small wrapper that handles invoking the correct continuations and appending continuations from shift blocks
43+
*/
44+
data class SingleShotCont<A, R>(
45+
private val continuation: Continuation<A>,
46+
private val shiftFnContinuations: MutableList<Continuation<R>>
47+
) : DelimitedContinuation<A, R> {
48+
override suspend fun invoke(a: A): R = suspendCoroutine { resumeShift ->
49+
shiftFnContinuations.add(resumeShift)
50+
continuation.resume(a)
51+
}
52+
}
53+
54+
/**
55+
* Wrapper that handles invoking manually cps transformed continuations
56+
*/
57+
data class CPSCont<A, R>(
58+
private val runFunc: suspend DelimitedScope<R>.(A) -> R
59+
) : DelimitedContinuation<A, R> {
60+
override suspend fun invoke(a: A): R = DelimContScope<R> { runFunc(a) }.invoke()
61+
}
62+
63+
/**
64+
* Captures the continuation and set [func] with the continuation to be executed next by the runloop.
65+
*/
66+
override suspend fun <A> shift(func: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
67+
suspendCoroutine { continueMain ->
68+
val delCont = SingleShotCont(continueMain, shiftFnContinuations)
69+
assert(nextShift.compareAndSet(null, suspend { this.func(delCont) }))
70+
}
71+
72+
/**
73+
* Same as [shift] except we never resume execution because we only continue in [c].
74+
*/
75+
override suspend fun <A, B> shiftCPS(func: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing =
76+
suspendCoroutine {
77+
assert(nextShift.compareAndSet(null, suspend { func(CPSCont(c)) }))
78+
}
79+
80+
/**
81+
* Unsafe if [f] calls [shift] on this scope! Use [NestedDelimContScope] instead if this is a problem.
82+
*/
83+
override suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A =
84+
DelimContScope(f).invoke()
85+
86+
fun invoke(): R {
87+
f.startCoroutineUninterceptedOrReturn(this, Continuation(EmptyCoroutineContext) { result ->
88+
resultVar.value = result.getOrThrow()
89+
}).let {
90+
if (it == COROUTINE_SUSPENDED) {
91+
// we have a call to shift so we must start execution the blocks there
92+
resultVar.loop { mRes ->
93+
if (mRes == null) {
94+
val nextShiftFn = nextShift.getAndSet(null)
95+
?: throw IllegalStateException("No further work to do but also no result!")
96+
nextShiftFn.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
97+
resultVar.value = result.getOrThrow()
98+
}).let {
99+
// If we suspended here we can just continue to loop because we should now have a new function to run
100+
// If we did not suspend we short-circuited and are thus done with looping
101+
if (it != COROUTINE_SUSPENDED) resultVar.value = it as R
102+
}
103+
// Break out of the infinite loop if we have a result
104+
} else return@let
105+
}
106+
}
107+
// we can return directly if we never suspended/called shift
108+
else return@invoke it as R
109+
}
110+
assert(resultVar.value != null)
111+
// We need to finish the partially evaluated shift blocks by passing them our result.
112+
// This will update the result via the continuations that now finish up
113+
for (c in shiftFnContinuations.asReversed()) c.resume(resultVar.value!!)
114+
// Return the final result
115+
return resultVar.value!!
116+
}
117+
118+
companion object {
119+
fun <R> reset(f: suspend DelimitedScope<R>.() -> R): R = DelimContScope(f).invoke()
120+
}
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package arrow.continuations.generic
2+
3+
/**
4+
* Base interface for a continuation
5+
*/
6+
interface DelimitedContinuation<A, R> {
7+
suspend operator fun invoke(a: A): R
8+
}
9+
10+
/**
11+
* Base interface for our scope.
12+
*/
13+
// TODO This should be @RestrictSuspension but that breaks because a superclass is not considered to be correct scope
14+
// @RestrictsSuspension
15+
interface DelimitedScope<R> {
16+
/**
17+
* Capture the continuation and pass it to [f].
18+
*/
19+
suspend fun <A> shift(f: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A
20+
21+
/**
22+
* Manually cps transformed shift. This can be used to gain multishot without hacks, but it's not the nicest for a few reasons:
23+
* - It does not inherit the scope, this means it will be hard to effects offering non-det to offer the same scope again...
24+
* - it is manually cps transformed which means every helper between this and invoking the continuation also needs to be transformed.
25+
*/
26+
suspend fun <A, B> shiftCPS(f: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing
27+
28+
/**
29+
* Nest another scope inside the current one.
30+
*
31+
* It is important to use this over creating an unrelated scope because
32+
*/
33+
suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package arrow.continuations.generic
2+
3+
import kotlinx.atomicfu.atomic
4+
import kotlinx.atomicfu.loop
5+
import kotlin.coroutines.Continuation
6+
import kotlin.coroutines.EmptyCoroutineContext
7+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
8+
import kotlin.coroutines.intrinsics.startCoroutineUninterceptedOrReturn
9+
import kotlin.coroutines.resume
10+
import kotlin.coroutines.suspendCoroutine
11+
12+
/**
13+
* (Simulated) Multishot capable delimited control scope
14+
*
15+
* This has several drawbacks:
16+
* - f will rerun completely on multishot and only the results of [shift] are cached so any sideeffects outside of
17+
* [shift] will rerun!
18+
* - This accumulates all results of [shift] (every argument passed when invoking the continuation) so on long running computations
19+
* this may keep quite a bit of memory
20+
* - If the pure part before a multishot is expensive the multishot itself will have to rerun that, which makes it somewhat slow
21+
* - This is terribly hard to implement properly with nested scopes (which this one does not support)
22+
*
23+
* As per usual understanding of [DelimContScope] is required as I will only be commenting differences for now.
24+
*/
25+
open class MultiShotDelimContScope<R>(val f: suspend DelimitedScope<R>.() -> R) : DelimitedScope<R> {
26+
27+
private val resultVar = atomic<R?>(null)
28+
private val nextShift = atomic<(suspend () -> R)?>(null)
29+
30+
// TODO This can be append only and needs fast reversed access
31+
private val shiftFnContinuations = mutableListOf<Continuation<R>>()
32+
33+
/**
34+
* Keep the arguments passed to [DelimitedContinuation.invoke] to be able to replay the scope if necessary
35+
*/
36+
// TODO This can be append only and needs fast random access and slicing
37+
internal open val stack = mutableListOf<Any?>()
38+
39+
/**
40+
* Our continuation now includes the function [f] to rerun on multishot, the current live (single-shot) continuation,
41+
* the current stack and the offset from that stack when this is created which is used to know when to resume normal
42+
* execution again on a replay.
43+
*/
44+
class MultiShotCont<A, R>(
45+
liveContinuation: Continuation<A>,
46+
private val f: suspend DelimitedScope<R>.() -> R,
47+
private val stack: MutableList<Any?>,
48+
private val shiftFnContinuations: MutableList<Continuation<R>>
49+
) : DelimitedContinuation<A, R> {
50+
// To make sure the continuation is only invoked once we put it in a nullable atomic and only access it through getAndSet
51+
private val liveContinuation = atomic<Continuation<A>?>(liveContinuation)
52+
private val stackOffset = stack.size
53+
54+
override suspend fun invoke(a: A): R =
55+
when (val cont = liveContinuation.getAndSet(null)) {
56+
// On multishot we replay with a prefilled stack from start to the point at which this object was created
57+
// (when the shift block this runs in was first called)
58+
null -> PrefilledDelimContScope((stack.subList(0, stackOffset).toList() + a).toMutableList(), f).invoke()
59+
// on the first pass we operate like a normal delimited scope but we also save the argument to the stack before resuming
60+
else -> suspendCoroutine { resumeShift ->
61+
shiftFnContinuations.add(resumeShift)
62+
stack.add(a)
63+
cont.resume(a)
64+
}
65+
}
66+
}
67+
68+
data class CPSCont<A, R>(
69+
private val runFunc: suspend DelimitedScope<R>.(A) -> R
70+
) : DelimitedContinuation<A, R> {
71+
override suspend fun invoke(a: A): R = DelimContScope<R> { runFunc(a) }.invoke()
72+
}
73+
74+
override suspend fun <A> shift(func: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
75+
suspendCoroutine { continueMain ->
76+
val c = MultiShotCont(continueMain, f, stack, shiftFnContinuations)
77+
assert(nextShift.compareAndSet(null, suspend { this.func(c) }))
78+
}
79+
80+
override suspend fun <A, B> shiftCPS(func: suspend (DelimitedContinuation<A, B>) -> R, c: suspend DelimitedScope<B>.(A) -> B): Nothing =
81+
suspendCoroutine {
82+
assert(nextShift.compareAndSet(null, suspend { func(CPSCont(c)) }))
83+
}
84+
85+
// This assumes RestrictSuspension or at least assumes the user to never reference the parent scope in f.
86+
override suspend fun <A> reset(f: suspend DelimitedScope<A>.() -> A): A =
87+
MultiShotDelimContScope(f).invoke()
88+
89+
fun invoke(): R {
90+
f.startCoroutineUninterceptedOrReturn(this, Continuation(EmptyCoroutineContext) { result ->
91+
resultVar.value = result.getOrThrow()
92+
}).let {
93+
if (it == COROUTINE_SUSPENDED) {
94+
resultVar.loop { mRes ->
95+
if (mRes == null) {
96+
val nextShiftFn = nextShift.getAndSet(null)
97+
?: throw IllegalStateException("No further work to do but also no result!")
98+
nextShiftFn.startCoroutineUninterceptedOrReturn(Continuation(EmptyCoroutineContext) { result ->
99+
resultVar.value = result.getOrThrow()
100+
}).let {
101+
if (it != COROUTINE_SUSPENDED) resultVar.value = it as R
102+
}
103+
} else return@let
104+
}
105+
} else return@invoke it as R
106+
}
107+
assert(resultVar.value != null)
108+
for (c in shiftFnContinuations.asReversed()) c.resume(resultVar.value!!)
109+
return resultVar.value!!
110+
}
111+
112+
companion object {
113+
fun <R> reset(f: suspend DelimitedScope<R>.() -> R): R = MultiShotDelimContScope(f).invoke()
114+
}
115+
}
116+
117+
class PrefilledDelimContScope<R>(
118+
override val stack: MutableList<Any?>,
119+
f: suspend DelimitedScope<R>.() -> R
120+
) : MultiShotDelimContScope<R>(f) {
121+
var depth = 0
122+
123+
// Here we first check if we still have values in our local stack and if so we use those first
124+
// if not we delegate to the normal delimited control implementation
125+
override suspend fun <A> shift(func: suspend DelimitedScope<R>.(DelimitedContinuation<A, R>) -> R): A =
126+
if (stack.size > depth) stack[depth++] as A
127+
else super.shift(func).also { depth++ }
128+
}

0 commit comments

Comments
 (0)