Skip to content

Commit 8e35d6d

Browse files
authored
Merge pull request #234 from kategory/paco-betterjobs
Improve Jobs
2 parents f2fe59d + a399f56 commit 8e35d6d

File tree

11 files changed

+217
-196
lines changed

11 files changed

+217
-196
lines changed

kategory-effects-test/src/main/kotlin/kategory/laws/AsyncLaws.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import io.kotlintest.properties.Gen
44
import io.kotlintest.properties.forAll
55

66
object AsyncLaws {
7-
inline fun <reified F> laws(AC: AsyncContext<F> = asyncContext(), M: MonadError<F, Throwable> = monadError<F, Throwable>(), EQ: Eq<HK<F, Int>>, EQER: Eq<HK<F, Int>> = EQ): List<Law> =
8-
MonadErrorLaws.laws(M, EQ) + listOf(
7+
inline fun <reified F> laws(AC: AsyncContext<F> = asyncContext(), M: MonadError<F, Throwable> = monadError<F, Throwable>(), EQ: Eq<HK<F, Int>>, EQERR: Eq<HK<F, Int>> = EQ): List<Law> =
8+
MonadErrorLaws.laws(M, EQERR, EQ) + listOf(
99
Law("Async Laws: success equivalence", { asyncSuccess(AC, M, EQ) }),
10-
Law("Async Laws: error equivalence", { asyncError(AC, M, EQER) }),
10+
Law("Async Laws: error equivalence", { asyncError(AC, M, EQERR) }),
1111
Law("Async bind: binding blocks", { asyncBind(AC, M, EQ) }),
12-
Law("Async bind: binding failure", { asyncBindError(AC, M, EQER) }),
12+
Law("Async bind: binding failure", { asyncBindError(AC, M, EQERR) }),
1313
Law("Async bind: unsafe binding", { asyncBindUnsafe(AC, M, EQ) }),
14-
Law("Async bind: unsafe binding failure", { asyncBindUnsafeError(AC, M, EQER) }),
14+
Law("Async bind: unsafe binding failure", { asyncBindUnsafeError(AC, M, EQERR) }),
1515
Law("Async bind: binding in parallel", { asyncParallelBind(AC, M, EQ) })
1616
)
1717

kategory-effects/src/main/kotlin/kategory/effects/data/IO.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -58,24 +58,24 @@ import kategory.effects.data.internal.error
5858

5959
internal fun <A> attemptValue(): AndThen<A, IO<Either<Throwable, A>>> = AndThen({ a: A -> Pure(Either.Right(a)) }, { e -> Pure(Either.Left(e)) })
6060

61-
operator fun <A> invoke(f: (Unit) -> A): IO<A> = suspend { Pure(f(Unit)) }
61+
operator fun <A> invoke(f: () -> A): IO<A> = suspend { Pure(f()) }
6262

63-
fun <A> suspend(f: (Unit) -> IO<A>): IO<A> =
63+
fun <A> suspend(f: () -> IO<A>): IO<A> =
6464
Suspend(AndThen { _ ->
6565
try {
66-
f(Unit)
66+
f()
6767
} catch (throwable: Throwable) {
6868
raiseError(throwable)
6969
}
7070
})
7171

7272
fun <A> async(k: ((Either<Throwable, A>) -> Unit) -> Unit): IO<A> =
73-
Async { callBack ->
74-
onceOnly(callBack).let { ff: (Either<Throwable, A>) -> Unit ->
73+
Async { ff: (Either<Throwable, A>) -> Unit ->
74+
onceOnly(ff).let { callback: (Either<Throwable, A>) -> Unit ->
7575
try {
76-
k(ff)
76+
k(callback)
7777
} catch (throwable: Throwable) {
78-
ff(Either.Left(throwable))
78+
callback(Either.Left(throwable))
7979
}
8080
}
8181
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package kategory
2+
3+
import kategory.effects.data.internal.Platform.onceOnly
4+
import kotlinx.coroutines.experimental.CoroutineScope
5+
import kotlinx.coroutines.experimental.CoroutineStart
6+
import kotlinx.coroutines.experimental.Job
7+
import kotlinx.coroutines.experimental.launch
8+
import java.util.concurrent.atomic.AtomicReference
9+
import kotlin.coroutines.experimental.CoroutineContext
10+
11+
@higherkind data class JobKW<out A>(val thunk: ((Either<Throwable, A>) -> Unit) -> Job) : JobKWKind<A> {
12+
13+
fun <B> map(f: (A) -> B): JobKW<B> =
14+
JobKW { ff: (Either<Throwable, B>) -> Unit ->
15+
thunk { either: Either<Throwable, A> ->
16+
ff(either.map(f))
17+
}
18+
}
19+
20+
fun <B> flatMap(coroutineContext: CoroutineContext, f: (A) -> JobKW<B>): JobKW<B> =
21+
JobKW { ff: (Either<Throwable, B>) -> Unit ->
22+
val result = AtomicReference<Either<Throwable, A>>()
23+
thunk(result::set).apply {
24+
invokeOnCompletion { t: Throwable? ->
25+
val state: Either<Throwable, A>? = result.get()
26+
when {
27+
t == null && state != null -> state.fold({ ff(it.left()) }, { f(it).thunk(ff) })
28+
t != null -> JobKW.raiseError<B>(coroutineContext, t)
29+
else -> throw IllegalStateException("JobKW flatMap completed without success or error")
30+
}
31+
}
32+
}
33+
}
34+
35+
companion object {
36+
inline operator fun <A> invoke(coroutineContext: CoroutineContext, noinline f: suspend CoroutineScope.() -> A): JobKW<A> =
37+
JobKW {
38+
onceOnly(it).let { callback: (Either<Throwable, A>) -> Unit ->
39+
launch(coroutineContext, CoroutineStart.DEFAULT) {
40+
callback(
41+
try {
42+
f().right()
43+
} catch (err: Throwable) {
44+
err.left()
45+
}
46+
)
47+
}
48+
}
49+
}
50+
51+
inline fun <A> unsafe(coroutineContext: CoroutineContext, noinline f: suspend CoroutineScope.() -> Either<Throwable, A>): JobKW<A> =
52+
JobKW {
53+
onceOnly(it).let { callback: (Either<Throwable, A>) -> Unit ->
54+
launch(coroutineContext, CoroutineStart.DEFAULT) {
55+
callback(f())
56+
}
57+
}
58+
}
59+
60+
inline fun <A> async(coroutineContext: CoroutineContext, crossinline fa: Proc<A>): JobKW<A> =
61+
JobKW {
62+
onceOnly(it).let { callback: (Either<Throwable, A>) -> Unit ->
63+
launch(coroutineContext, CoroutineStart.DEFAULT) {
64+
fa(callback)
65+
}
66+
}
67+
}
68+
69+
inline fun <A> pure(coroutineContext: CoroutineContext, a: A): JobKW<A> =
70+
JobKW(coroutineContext) { a }
71+
72+
inline fun <A> raiseError(coroutineContext: CoroutineContext, t: Throwable): JobKW<A> =
73+
JobKW.unsafe(coroutineContext) { t.left() }
74+
75+
fun <A, B> tailRecM(coroutineContext: CoroutineContext, a: A, f: (A) -> JobKW<Either<A, B>>): JobKW<B> =
76+
JobKW.async(coroutineContext) { ff: (Either<Throwable, B>) -> Unit ->
77+
f(a).runJob { either: Either<Throwable, Either<A, B>> ->
78+
either.fold({ ff(it.left()) }, {
79+
when (it) {
80+
is Either.Right -> ff(it.b.right())
81+
is Either.Left -> tailRecM(coroutineContext, a, f)
82+
}
83+
})
84+
}
85+
}
86+
87+
inline fun instances(coroutineContext: CoroutineContext): JobKWInstances =
88+
object : JobKWInstances {
89+
override fun CC(): CoroutineContext = coroutineContext
90+
}
91+
92+
inline fun functor(coroutineContext: CoroutineContext): Functor<JobKWHK> = instances(coroutineContext)
93+
94+
inline fun applicative(coroutineContext: CoroutineContext): Applicative<JobKWHK> = instances(coroutineContext)
95+
96+
inline fun monad(coroutineContext: CoroutineContext): Monad<JobKWHK> = instances(coroutineContext)
97+
98+
inline fun monadError(coroutineContext: CoroutineContext): MonadError<JobKWHK, Throwable> = instances(coroutineContext)
99+
100+
inline fun asyncContext(coroutineContext: CoroutineContext): AsyncContext<JobKWHK> = instances(coroutineContext)
101+
}
102+
}
103+
104+
fun <A> JobKWKind<A>.runJob(ff: (Either<Throwable, A>) -> Unit): Job =
105+
this.ev().thunk(ff)
106+
107+
inline fun <A> JobKW<A>.handleErrorWith(crossinline function: (Throwable) -> JobKW<A>): JobKW<A> =
108+
JobKW { ff: (Either<Throwable, A>) -> Unit ->
109+
val result = AtomicReference<Either<Throwable, A>>()
110+
thunk(result::set).apply {
111+
invokeOnCompletion { t: Throwable? ->
112+
val state: Either<Throwable, A>? = result.get()
113+
when {
114+
t == null && state != null -> state.fold({ function(it).thunk(ff) }, { ff(it.right()) })
115+
t != null -> function(t).thunk(ff)
116+
else -> throw IllegalStateException("JobKW handleErrorWith completed without success or error")
117+
}
118+
}
119+
}
120+
}

kategory-effects/src/main/kotlin/kategory/effects/data/JobW.kt

-101
This file was deleted.

kategory-effects/src/main/kotlin/kategory/effects/data/internal/Utils.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer
99

1010
object Platform {
1111

12-
internal fun <A> onceOnly(f: (A) -> Unit): (A) -> Unit {
12+
inline fun <A> onceOnly(crossinline f: (A) -> Unit): (A) -> Unit {
1313
val wasCalled = AtomicBoolean(false)
1414

1515
return { a ->
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package kategory
2+
3+
import kotlin.coroutines.experimental.CoroutineContext
4+
5+
interface JobKWInstances :
6+
Functor<JobKWHK>,
7+
Applicative<JobKWHK>,
8+
Monad<JobKWHK>,
9+
MonadError<JobKWHK, Throwable>,
10+
AsyncContext<JobKWHK> {
11+
12+
fun CC(): CoroutineContext
13+
14+
override fun <A> pure(a: A): JobKW<A> =
15+
JobKW.pure(CC(), a)
16+
17+
override fun <A, B> map(fa: HK<JobKWHK, A>, f: (A) -> B): JobKW<B> =
18+
fa.ev().map(f)
19+
20+
override fun <A, B> flatMap(fa: HK<JobKWHK, A>, f: (A) -> HK<JobKWHK, B>): JobKW<B> =
21+
fa.ev().flatMap(CC()) { a: A -> f(a).ev() }
22+
23+
override fun <A, B> tailRecM(a: A, f: (A) -> HK<JobKWHK, Either<A, B>>): JobKW<B> =
24+
JobKW.tailRecM(CC(), a) { aa: A -> f(aa).ev() }
25+
26+
override fun <A> raiseError(e: Throwable): JobKW<A> =
27+
JobKW.raiseError(CC(), e)
28+
29+
override fun <A> handleErrorWith(fa: HK<JobKWHK, A>, f: (Throwable) -> HK<JobKWHK, A>): JobKW<A> =
30+
fa.ev().handleErrorWith { err: Throwable -> f(err).ev() }
31+
32+
override fun <A> runAsync(fa: Proc<A>): HK<JobKWHK, A> =
33+
JobKW.async(CC(), fa)
34+
35+
}

kategory-effects/src/main/kotlin/kategory/effects/instances/JobWInstances.kt

-35
This file was deleted.

0 commit comments

Comments
 (0)