Skip to content

Commit 18bd4b0

Browse files
Revert parTupledN and parMapN derivation (#249)
* Revert parTupledN and parMapN derivation * Trigger CI * Fix parMapN docs Co-authored-by: danieh <[email protected]>
1 parent e0ff421 commit 18bd4b0

File tree

4 files changed

+64
-76
lines changed

4 files changed

+64
-76
lines changed

arrow-libs/fx/arrow-docs/docs/fx/async/README.md

+1-3
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ data class ThreadInfo(
6666
6767
suspend fun main(): Unit {
6868
val (threadA: String, threadB: String) =
69-
parMapN(::threadName, ::threadName) { (a, b) ->
70-
ThreadInfo(a, b)
71-
}
69+
parMapN(::threadName, ::threadName, ::ThreadInfo)
7270
7371
println(threadA)
7472
println(threadB)

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTraverse.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,6 @@ suspend fun <A, B> Iterable<A>.parTraverse(ctx: CoroutineContext, f: suspend (A)
110110
if (ctx === EmptyCoroutineContext || ctx[ContinuationInterceptor] == null) map { a -> f(a) }
111111
else toList().foldRight(suspend { emptyList<B>() }) { a, acc ->
112112
suspend {
113-
parMapN(ctx, { f(a) }, { acc.invoke() }) { (a, b) -> listOf(a) + b }
113+
parMapN(ctx, { f(a) }, { acc.invoke() }) { a, b -> listOf(a) + b }
114114
}
115115
}.invoke()

arrow-libs/fx/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt

+61-71
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,48 @@ import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
99
import kotlin.coroutines.intrinsics.intercepted
1010
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
1111

12+
/**
13+
* Tuples [fa], [fb] in parallel on [ComputationPool].
14+
* Cancelling this operation cancels both operations running in parallel.
15+
*
16+
* @see parTupledN for the same function that can race on any [CoroutineContext].
17+
*/
18+
suspend fun <A, B> parTupledN(fa: suspend () -> A, fb: suspend () -> B): Pair<A, B> =
19+
parTupledN(ComputationPool, fa, fb)
20+
21+
/**
22+
* Tuples [fa], [fb], [fc] in parallel on [ComputationPool].
23+
* Cancelling this operation cancels both tasks running in parallel.
24+
*
25+
* @see parTupledN for the same function that can race on any [CoroutineContext].
26+
*/
27+
suspend fun <A, B, C> parTupledN(fa: suspend () -> A, fb: suspend () -> B, fc: suspend () -> C): Triple<A, B, C> =
28+
parTupledN(ComputationPool, fa, fb, fc)
29+
30+
/**
31+
* Tuples [fa], [fb] on the provided [CoroutineContext].
32+
* Cancelling this operation cancels both tasks running in parallel.
33+
*
34+
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
35+
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
36+
*
37+
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
38+
*/
39+
suspend fun <A, B> parTupledN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend () -> B): Pair<A, B> =
40+
parMapN(ctx, fa, fb, ::Pair)
41+
42+
/**
43+
* Tuples [fa], [fb] & [fc] on the provided [CoroutineContext].
44+
* Cancelling this operation cancels both tasks running in parallel.
45+
*
46+
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
47+
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
48+
*
49+
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
50+
*/
51+
suspend fun <A, B, C> parTupledN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend () -> B, fc: suspend () -> C): Triple<A, B, C> =
52+
parMapN(ctx, fa, fb, fc, ::Triple)
53+
1254
/**
1355
* Parallel maps [fa], [fb] in parallel on [ComputationPool].
1456
* Cancelling this operation cancels both operations running in parallel.
@@ -21,7 +63,7 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
2163
* val result = parMapN(
2264
* { "First one is on ${Thread.currentThread().name}" },
2365
* { "Second one is on ${Thread.currentThread().name}" }
24-
* ) { (a, b) ->
66+
* ) { a, b ->
2567
* "$a\n$b"
2668
* }
2769
* //sampleEnd
@@ -36,8 +78,8 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
3678
*
3779
* @see parMapN for the same function that can race on any [CoroutineContext].
3880
*/
39-
suspend fun <A, B, C> parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (Pair<A, B>) -> C): C =
40-
f(parTupledN(ComputationPool, fa, fb))
81+
suspend fun <A, B, C> parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (A, B) -> C): C =
82+
parMapN(ComputationPool, fa, fb, f)
4183

4284
/**
4385
* Parallel maps [fa], [fb], [fc] in parallel on [ComputationPool].
@@ -49,9 +91,8 @@ suspend fun <A, B, C, D> parMapN(
4991
fa: suspend () -> A,
5092
fb: suspend () -> B,
5193
fc: suspend () -> C,
52-
f: (Triple<A, B, C>) -> D
53-
): D =
54-
f(parTupledN(ComputationPool, fa, fb, fc))
94+
f: (A, B, C) -> D
95+
): D = parMapN(ComputationPool, fa, fb, fc, f)
5596

5697
/**
5798
* Parallel maps [fa], [fb] on the provided [CoroutineContext].
@@ -63,66 +104,13 @@ suspend fun <A, B, C, D> parMapN(
63104
*
64105
* @see parMapN for a function that ensures it runs in parallel on the [ComputationPool].
65106
*/
107+
@Suppress("UNCHECKED_CAST")
66108
suspend fun <A, B, C> parMapN(
67109
ctx: CoroutineContext,
68110
fa: suspend () -> A,
69111
fb: suspend () -> B,
70-
f: (Pair<A, B>) -> C
112+
f: (A, B) -> C
71113
): C =
72-
f(parTupledN(ctx, fa, fb))
73-
74-
/**
75-
* Parallel maps [fa], [fb], [fc] on the provided [CoroutineContext].
76-
* Cancelling this operation cancels both tasks running in parallel.
77-
*
78-
* **WARNING** this function forks [fa], [fb] & [fc] but if it runs in parallel depends
79-
* on the capabilities of the provided [CoroutineContext].
80-
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
81-
*
82-
* @see parMapN for a function that ensures it runs in parallel on the [ComputationPool].
83-
*/
84-
suspend fun <A, B, C, D> parMapN(
85-
ctx: CoroutineContext,
86-
fa: suspend () -> A,
87-
fb: suspend () -> B,
88-
fc: suspend () -> C,
89-
f: (Triple<A, B, C>) -> D
90-
): D =
91-
f(parTupledN(ctx, fa, fb, fc))
92-
93-
/**
94-
* Tuples [fa], [fb] in parallel on [ComputationPool].
95-
* Cancelling this operation cancels both operations running in parallel.
96-
*
97-
* @see parTupledN for the same function that can race on any [CoroutineContext].
98-
*/
99-
suspend fun <A, B> parTupledN(fa: suspend () -> A, fb: suspend () -> B): Pair<A, B> =
100-
parTupledN(ComputationPool, fa, fb)
101-
102-
/**
103-
* Tuples [fa], [fb], [fc] in parallel on [ComputationPool].
104-
* Cancelling this operation cancels both tasks running in parallel.
105-
*
106-
* @see parTupledN for the same function that can race on any [CoroutineContext].
107-
*/
108-
suspend fun <A, B, C> parTupledN(fa: suspend () -> A, fb: suspend () -> B, fc: suspend () -> C): Triple<A, B, C> =
109-
parTupledN(ComputationPool, fa, fb, fc)
110-
111-
/**
112-
* Tuples [fa], [fb] on the provided [CoroutineContext].
113-
* Cancelling this operation cancels both tasks running in parallel.
114-
*
115-
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
116-
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
117-
*
118-
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
119-
*/
120-
@Suppress("UNCHECKED_CAST")
121-
suspend fun <A, B> parTupledN(
122-
ctx: CoroutineContext,
123-
fa: suspend () -> A,
124-
fb: suspend () -> B
125-
): Pair<A, B> =
126114
suspendCoroutineUninterceptedOrReturn { cont ->
127115
val conn = cont.context.connection()
128116
val cont = cont.intercepted()
@@ -140,9 +128,9 @@ suspend fun <A, B> parTupledN(
140128
conn.pop()
141129
cb(
142130
try {
143-
Result.success(Pair(a, b))
131+
Result.success(f(a, b))
144132
} catch (e: Throwable) {
145-
Result.failure<Pair<A, B>>(e.nonFatalOrThrow())
133+
Result.failure<C>(e.nonFatalOrThrow())
146134
}
147135
)
148136
}
@@ -151,7 +139,7 @@ suspend fun <A, B> parTupledN(
151139
is Throwable -> Unit // Do nothing we already finished
152140
else -> other.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r ->
153141
conn.pop()
154-
cb(Result.failure<Pair<A, B>>(r.fold({ e }, { e2 -> Platform.composeErrors(e, e2) })))
142+
cb(Result.failure(r.fold({ e }, { e2 -> Platform.composeErrors(e, e2) })))
155143
})
156144
}
157145

@@ -185,20 +173,22 @@ suspend fun <A, B> parTupledN(
185173
}
186174

187175
/**
188-
* Tuples [fa], [fb] & [fc] on the provided [CoroutineContext].
176+
* Parallel maps [fa], [fb], [fc] on the provided [CoroutineContext].
189177
* Cancelling this operation cancels both tasks running in parallel.
190178
*
191-
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
179+
* **WARNING** this function forks [fa], [fb] & [fc] but if it runs in parallel depends
180+
* on the capabilities of the provided [CoroutineContext].
192181
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
193182
*
194-
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
183+
* @see parMapN for a function that ensures it runs in parallel on the [ComputationPool].
195184
*/
196-
suspend fun <A, B, C> parTupledN(
185+
suspend fun <A, B, C, D> parMapN(
197186
ctx: CoroutineContext,
198187
fa: suspend () -> A,
199188
fb: suspend () -> B,
200-
fc: suspend () -> C
201-
): Triple<A, B, C> =
189+
fc: suspend () -> C,
190+
f: (A, B, C) -> D
191+
): D =
202192
suspendCoroutineUninterceptedOrReturn { cont ->
203193
val conn = cont.context.connection()
204194
val cont = cont.intercepted()
@@ -217,7 +207,7 @@ suspend fun <A, B, C> parTupledN(
217207

218208
fun complete(a: A, b: B, c: C) {
219209
conn.pop()
220-
cb(Result.success(Triple(a, b, c)))
210+
cb(Result.success(f(a, b, c)))
221211
}
222212

223213
fun tryComplete(result: Triple<A?, B?, C?>?): Unit {

arrow-libs/fx/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class InterruptionTest : StreamSpec(spec = {
3838

3939
latch.get()
4040
f.cancel()
41-
timeOutOrNull(50.milliseconds) { exit.get() } shouldBe ExitCase.Cancelled
41+
exit.get() shouldBe ExitCase.Cancelled
4242
}
4343
}
4444

0 commit comments

Comments
 (0)