Skip to content

Commit 622114f

Browse files
authored
Complete test suite to check interception (#207)
* Test RaceTriple returning on correct context, and replace `suspendCoroutine` with `suspendCoroutineUninterceptedOrReturn`. * Test RacePair returning on correct context, and replace `suspendCoroutine` with `suspendCoroutineUninterceptedOrReturn`. * Test RaceN returning on correct context, and replace `suspendCoroutine` with `suspendCoroutineUninterceptedOrReturn`. * Test ParTupledN returning on correct context, and replace `suspendCoroutine` with `suspendCoroutineUninterceptedOrReturn`. * Test Fiber returning on correct context. * Add checkAll with Arb * Complete test suite timer.kt * Add missing tests failures * ktlintFormat
1 parent d452456 commit 622114f

File tree

14 files changed

+603
-145
lines changed

14 files changed

+603
-145
lines changed

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt

+11-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import kotlin.coroutines.Continuation
55
import kotlin.coroutines.CoroutineContext
66
import kotlin.coroutines.EmptyCoroutineContext
77
import kotlin.coroutines.startCoroutine
8-
import kotlin.coroutines.suspendCoroutine
8+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
9+
import kotlin.coroutines.intrinsics.intercepted
10+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
911

1012
/**
1113
* Parallel maps [fa], [fb] in parallel on [ComputationPool].
@@ -100,8 +102,9 @@ suspend fun <A, B> parTupledN(
100102
fa: suspend () -> A,
101103
fb: suspend () -> B
102104
): Pair<A, B> =
103-
suspendCoroutine { cont ->
105+
suspendCoroutineUninterceptedOrReturn { cont ->
104106
val conn = cont.context.connection()
107+
val cont = cont.intercepted()
105108
val cb = cont::resumeWith
106109

107110
// Used to store Throwable, Either<A, B> or empty (null). (No sealed class used for a slightly better performing ParMap2)
@@ -156,6 +159,8 @@ suspend fun <A, B> parTupledN(
156159
sendException(connA, e)
157160
})
158161
})
162+
163+
COROUTINE_SUSPENDED
159164
}
160165

161166
/**
@@ -173,8 +178,9 @@ suspend fun <A, B, C> parTupledN(
173178
fb: suspend () -> B,
174179
fc: suspend () -> C
175180
): Triple<A, B, C> =
176-
suspendCoroutine { cont ->
181+
suspendCoroutineUninterceptedOrReturn { cont ->
177182
val conn = cont.context.connection()
183+
val cont = cont.intercepted()
178184
val cb = cont::resumeWith
179185

180186
val state: AtomicRefW<Triple<A?, B?, C?>?> = AtomicRefW(null)
@@ -252,4 +258,6 @@ suspend fun <A, B, C> parTupledN(
252258
sendException(connA, connC, e)
253259
})
254260
})
261+
262+
COROUTINE_SUSPENDED
255263
}

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Race2.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import kotlin.coroutines.Continuation
55
import kotlin.coroutines.CoroutineContext
66
import kotlin.coroutines.EmptyCoroutineContext
77
import kotlin.coroutines.startCoroutine
8-
import kotlin.coroutines.suspendCoroutine
8+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
9+
import kotlin.coroutines.intrinsics.intercepted
10+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
911

1012
/**
1113
* Races the participants [fa], [fb] in parallel on the [ComputationPool].
@@ -60,8 +62,10 @@ suspend fun <A, B> raceN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend
6062
})
6163
} else Unit
6264

63-
return suspendCoroutine { cont ->
65+
return suspendCoroutineUninterceptedOrReturn { cont ->
6466
val conn = cont.context.connection()
67+
val cont = cont.intercepted()
68+
6569
val active = AtomicBooleanW(true)
6670
val connA = SuspendConnection()
6771
val connB = SuspendConnection()
@@ -82,5 +86,7 @@ suspend fun <A, B> raceN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend
8286
onError(active, cont::resumeWith, conn, connA, it)
8387
})
8488
})
89+
90+
COROUTINE_SUSPENDED
8591
}
8692
}

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/Race3.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import kotlin.coroutines.Continuation
44
import kotlin.coroutines.CoroutineContext
55
import kotlin.coroutines.EmptyCoroutineContext
66
import kotlin.coroutines.startCoroutine
7-
import kotlin.coroutines.suspendCoroutine
7+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
8+
import kotlin.coroutines.intrinsics.intercepted
9+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
810

911
sealed class Race3<out A, out B, out C> {
1012
data class First<A>(val winner: A) : Race3<A, Nothing, Nothing>()
@@ -103,8 +105,9 @@ suspend fun <A, B, C> raceN(
103105
})
104106
} else Unit
105107

106-
return suspendCoroutine { cont ->
108+
return suspendCoroutineUninterceptedOrReturn { cont ->
107109
val conn = cont.context.connection()
110+
val cont = cont.intercepted()
108111

109112
val active = AtomicBooleanW(true)
110113
val connA = SuspendConnection()
@@ -136,5 +139,7 @@ suspend fun <A, B, C> raceN(
136139
onError(active, cont::resumeWith, conn, connA, connB, it)
137140
})
138141
})
142+
143+
COROUTINE_SUSPENDED
139144
}
140145
}

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/RacePair.kt

+55-46
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@ import kotlin.coroutines.Continuation
55
import kotlin.coroutines.CoroutineContext
66
import kotlin.coroutines.EmptyCoroutineContext
77
import kotlin.coroutines.startCoroutine
8-
import kotlin.coroutines.suspendCoroutine
8+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
9+
import kotlin.coroutines.intrinsics.intercepted
10+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
911

1012
typealias RacePair<A, B> = Either<Pair<A, Fiber<B>>, Pair<Fiber<A>, B>>
1113

14+
fun <A, B, C> Either<Pair<A, Fiber<B>>, Pair<Fiber<A>, B>>.fold(ifLeft: (A, Fiber<B>) -> C, ifRight: (Fiber<A>, B) -> C): C =
15+
fold({ (a, b) -> ifLeft(a, b) }, { (a, b) -> ifRight(a, b) })
16+
1217
suspend fun <A, B> racePair(fa: suspend () -> A, fb: suspend () -> B): RacePair<A, B> =
1318
racePair(ComputationPool, fa, fb)
1419

@@ -44,57 +49,61 @@ suspend fun <A, B> racePair(
4449
ctx: CoroutineContext,
4550
fa: suspend () -> A,
4651
fb: suspend () -> B
47-
): RacePair<A, B> = suspendCoroutine { cont ->
48-
val conn = cont.context.connection()
49-
val active = AtomicBooleanW(true)
52+
): RacePair<A, B> =
53+
suspendCoroutineUninterceptedOrReturn { cont ->
54+
val conn = cont.context.connection()
55+
val cont = cont.intercepted()
56+
val active = AtomicBooleanW(true)
5057

51-
// Cancellable connection for the left value
52-
val connA = SuspendConnection()
53-
val promiseA = UnsafePromise<A>()
58+
// Cancellable connection for the left value
59+
val connA = SuspendConnection()
60+
val promiseA = UnsafePromise<A>()
5461

55-
// Cancellable connection for the right value
56-
val connB = SuspendConnection()
57-
val promiseB = UnsafePromise<B>()
62+
// Cancellable connection for the right value
63+
val connB = SuspendConnection()
64+
val promiseB = UnsafePromise<B>()
5865

59-
conn.pushPair(connA, connB)
66+
conn.pushPair(connA, connB)
6067

61-
fa.startCoroutineCancellable(CancellableContinuation(ctx, connA) { result ->
62-
result.fold({ a ->
63-
if (active.getAndSet(false)) {
64-
conn.pop()
65-
cont.resumeWith(Result.success(Either.Left(Pair(a, Fiber(promiseB, connB)))))
66-
} else {
67-
promiseA.complete(Result.success(a))
68-
}
69-
}, { error ->
70-
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
71-
connB.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r2 ->
68+
fa.startCoroutineCancellable(CancellableContinuation(ctx, connA) { result ->
69+
result.fold({ a ->
70+
if (active.getAndSet(false)) {
7271
conn.pop()
73-
cont.resumeWith(Result.failure(r2.fold({ error }, { Platform.composeErrors(error, it) })))
74-
})
75-
} else {
76-
promiseA.complete(Result.failure(error))
77-
}
72+
cont.resumeWith(Result.success(Either.Left(Pair(a, Fiber(promiseB, connB)))))
73+
} else {
74+
promiseA.complete(Result.success(a))
75+
}
76+
}, { error ->
77+
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
78+
connB.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r2 ->
79+
conn.pop()
80+
cont.resumeWith(Result.failure(r2.fold({ error }, { Platform.composeErrors(error, it) })))
81+
})
82+
} else {
83+
promiseA.complete(Result.failure(error))
84+
}
85+
})
7886
})
79-
})
8087

81-
fb.startCoroutineCancellable(CancellableContinuation(ctx, connB) { result ->
82-
result.fold({ b ->
83-
if (active.getAndSet(false)) {
84-
conn.pop()
85-
cont.resumeWith(Result.success(Either.Right(Pair(Fiber(promiseA, connA), b))))
86-
} else {
87-
promiseB.complete(Result.success(b))
88-
}
89-
}, { error ->
90-
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
91-
connA.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r2 ->
88+
fb.startCoroutineCancellable(CancellableContinuation(ctx, connB) { result ->
89+
result.fold({ b ->
90+
if (active.getAndSet(false)) {
9291
conn.pop()
93-
cont.resumeWith(Result.failure(r2.fold({ error }, { Platform.composeErrors(error, it) })))
94-
})
95-
} else {
96-
promiseB.complete(Result.failure(error))
97-
}
92+
cont.resumeWith(Result.success(Either.Right(Pair(Fiber(promiseA, connA), b))))
93+
} else {
94+
promiseB.complete(Result.success(b))
95+
}
96+
}, { error ->
97+
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
98+
connA.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r2 ->
99+
conn.pop()
100+
cont.resumeWith(Result.failure(r2.fold({ error }, { Platform.composeErrors(error, it) })))
101+
})
102+
} else {
103+
promiseB.complete(Result.failure(error))
104+
}
105+
})
98106
})
99-
})
100-
}
107+
108+
COROUTINE_SUSPENDED
109+
}

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/RaceTriple.kt

+78-72
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import kotlin.coroutines.Continuation
44
import kotlin.coroutines.CoroutineContext
55
import kotlin.coroutines.EmptyCoroutineContext
66
import kotlin.coroutines.startCoroutine
7-
import kotlin.coroutines.suspendCoroutine
7+
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
8+
import kotlin.coroutines.intrinsics.intercepted
9+
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
810

911
sealed class RaceTriple<A, B, C> {
1012
data class First<A, B, C>(val winner: A, val fiberB: Fiber<B>, val fiberC: Fiber<C>) : RaceTriple<A, B, C>()
@@ -60,79 +62,83 @@ suspend fun <A, B, C> raceTriple(
6062
fa: suspend () -> A,
6163
fb: suspend () -> B,
6264
fc: suspend () -> C
63-
): RaceTriple<A, B, C> = suspendCoroutine { cont ->
64-
val conn = cont.context.connection()
65-
val active = AtomicBooleanW(true)
66-
67-
// Cancellable connection for the left value
68-
val connA = SuspendConnection()
69-
val promiseA = UnsafePromise<A>()
70-
71-
// Cancellable connection for the right value
72-
val connB = SuspendConnection()
73-
val promiseB = UnsafePromise<B>()
74-
75-
// Cancellable connection for the right value
76-
val connC = SuspendConnection()
77-
val promiseC = UnsafePromise<C>()
78-
79-
conn.push(listOf(connA.cancelToken(), connB.cancelToken(), connC.cancelToken()))
80-
81-
fun <A> onError(
82-
error: Throwable,
83-
connB: SuspendConnection,
84-
connC: SuspendConnection,
85-
promise: UnsafePromise<A>
86-
): Unit {
87-
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
88-
connB.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r2 ->
89-
connC.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r3 ->
90-
conn.pop()
65+
): RaceTriple<A, B, C> =
66+
suspendCoroutineUninterceptedOrReturn { cont ->
67+
val conn = cont.context.connection()
68+
val cont = cont.intercepted()
69+
val active = AtomicBooleanW(true)
9170

92-
val errorResult = r2.fold({
93-
r3.fold({ error }, { e3 -> Platform.composeErrors(error, e3) })
94-
}, { e2 ->
95-
r3.fold({ Platform.composeErrors(error, e2) }, { e3 -> Platform.composeErrors(error, e2, e3) })
96-
})
71+
// Cancellable connection for the left value
72+
val connA = SuspendConnection()
73+
val promiseA = UnsafePromise<A>()
9774

98-
cont.resumeWith(Result.failure(errorResult))
99-
})
100-
})
101-
} else {
102-
promise.complete(Result.failure(error))
103-
}
104-
}
75+
// Cancellable connection for the right value
76+
val connB = SuspendConnection()
77+
val promiseB = UnsafePromise<B>()
10578

106-
fa.startCoroutineCancellable(CancellableContinuation(ctx, connA) { result ->
107-
result.fold({ a ->
108-
if (active.getAndSet(false)) {
109-
conn.pop()
110-
cont.resumeWith(Result.success(RaceTriple.First(a, Fiber(promiseB, connB), Fiber(promiseC, connC))))
111-
} else {
112-
promiseA.complete(Result.success(a))
113-
}
114-
}, { error -> onError(error, connB, connC, promiseA) })
115-
})
116-
117-
fb.startCoroutineCancellable(CancellableContinuation(ctx, connB) { result ->
118-
result.fold({ b ->
119-
if (active.getAndSet(false)) {
120-
conn.pop()
121-
cont.resumeWith(Result.success(RaceTriple.Second(Fiber(promiseA, connA), b, Fiber(promiseC, connC))))
122-
} else {
123-
promiseB.complete(Result.success(b))
124-
}
125-
}, { error -> onError(error, connA, connC, promiseB) })
126-
})
127-
128-
fc.startCoroutineCancellable(CancellableContinuation(ctx, connC) { result ->
129-
result.fold({ c ->
130-
if (active.getAndSet(false)) {
131-
conn.pop()
132-
cont.resumeWith(Result.success(RaceTriple.Third(Fiber(promiseA, connA), Fiber(promiseB, connB), c)))
79+
// Cancellable connection for the right value
80+
val connC = SuspendConnection()
81+
val promiseC = UnsafePromise<C>()
82+
83+
conn.push(listOf(connA.cancelToken(), connB.cancelToken(), connC.cancelToken()))
84+
85+
fun <A> onError(
86+
error: Throwable,
87+
connB: SuspendConnection,
88+
connC: SuspendConnection,
89+
promise: UnsafePromise<A>
90+
): Unit {
91+
if (active.getAndSet(false)) { // if an error finishes first, stop the race.
92+
connB.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r2 ->
93+
connC.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r3 ->
94+
conn.pop()
95+
96+
val errorResult = r2.fold({
97+
r3.fold({ error }, { e3 -> Platform.composeErrors(error, e3) })
98+
}, { e2 ->
99+
r3.fold({ Platform.composeErrors(error, e2) }, { e3 -> Platform.composeErrors(error, e2, e3) })
100+
})
101+
102+
cont.resumeWith(Result.failure(errorResult))
103+
})
104+
})
133105
} else {
134-
promiseC.complete(Result.success(c))
106+
promise.complete(Result.failure(error))
135107
}
136-
}, { error -> onError(error, connA, connB, promiseC) })
137-
})
138-
}
108+
}
109+
110+
fa.startCoroutineCancellable(CancellableContinuation(ctx, connA) { result ->
111+
result.fold({ a ->
112+
if (active.getAndSet(false)) {
113+
conn.pop()
114+
cont.resumeWith(Result.success(RaceTriple.First(a, Fiber(promiseB, connB), Fiber(promiseC, connC))))
115+
} else {
116+
promiseA.complete(Result.success(a))
117+
}
118+
}, { error -> onError(error, connB, connC, promiseA) })
119+
})
120+
121+
fb.startCoroutineCancellable(CancellableContinuation(ctx, connB) { result ->
122+
result.fold({ b ->
123+
if (active.getAndSet(false)) {
124+
conn.pop()
125+
cont.resumeWith(Result.success(RaceTriple.Second(Fiber(promiseA, connA), b, Fiber(promiseC, connC))))
126+
} else {
127+
promiseB.complete(Result.success(b))
128+
}
129+
}, { error -> onError(error, connA, connC, promiseB) })
130+
})
131+
132+
fc.startCoroutineCancellable(CancellableContinuation(ctx, connC) { result ->
133+
result.fold({ c ->
134+
if (active.getAndSet(false)) {
135+
conn.pop()
136+
cont.resumeWith(Result.success(RaceTriple.Third(Fiber(promiseA, connA), Fiber(promiseB, connB), c)))
137+
} else {
138+
promiseC.complete(Result.success(c))
139+
}
140+
}, { error -> onError(error, connA, connB, promiseC) })
141+
})
142+
143+
COROUTINE_SUSPENDED
144+
}

0 commit comments

Comments
 (0)