Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Jobs #234

Merged
merged 16 commits into from
Aug 29, 2017
10 changes: 5 additions & 5 deletions kategory-effects-test/src/main/kotlin/kategory/laws/AsyncLaws.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import io.kotlintest.properties.Gen
import io.kotlintest.properties.forAll

object AsyncLaws {
inline fun <reified F> laws(AC: AsyncContext<F> = asyncContext(), M: MonadError<F, Throwable> = monadError<F, Throwable>(), EQ: Eq<HK<F, Int>>, EQER: Eq<HK<F, Int>> = EQ): List<Law> =
MonadErrorLaws.laws(M, EQ) + listOf(
inline fun <reified F> laws(AC: AsyncContext<F> = asyncContext(), M: MonadError<F, Throwable> = monadError<F, Throwable>(), EQ: Eq<HK<F, Int>>, EQERR: Eq<HK<F, Int>> = EQ): List<Law> =
MonadErrorLaws.laws(M, EQERR, EQ) + listOf(
Law("Async Laws: success equivalence", { asyncSuccess(AC, M, EQ) }),
Law("Async Laws: error equivalence", { asyncError(AC, M, EQER) }),
Law("Async Laws: error equivalence", { asyncError(AC, M, EQERR) }),
Law("Async bind: binding blocks", { asyncBind(AC, M, EQ) }),
Law("Async bind: binding failure", { asyncBindError(AC, M, EQER) }),
Law("Async bind: binding failure", { asyncBindError(AC, M, EQERR) }),
Law("Async bind: unsafe binding", { asyncBindUnsafe(AC, M, EQ) }),
Law("Async bind: unsafe binding failure", { asyncBindUnsafeError(AC, M, EQER) }),
Law("Async bind: unsafe binding failure", { asyncBindUnsafeError(AC, M, EQERR) }),
Law("Async bind: binding in parallel", { asyncParallelBind(AC, M, EQ) })
)

Expand Down
14 changes: 7 additions & 7 deletions kategory-effects/src/main/kotlin/kategory/effects/data/IO.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,24 @@ import kategory.effects.data.internal.error

internal fun <A> attemptValue(): AndThen<A, IO<Either<Throwable, A>>> = AndThen({ a: A -> Pure(Either.Right(a)) }, { e -> Pure(Either.Left(e)) })

operator fun <A> invoke(f: (Unit) -> A): IO<A> = suspend { Pure(f(Unit)) }
operator fun <A> invoke(f: () -> A): IO<A> = suspend { Pure(f()) }

fun <A> suspend(f: (Unit) -> IO<A>): IO<A> =
fun <A> suspend(f: () -> IO<A>): IO<A> =
Suspend(AndThen { _ ->
try {
f(Unit)
f()
} catch (throwable: Throwable) {
raiseError(throwable)
}
})

fun <A> async(k: ((Either<Throwable, A>) -> Unit) -> Unit): IO<A> =
Async { callBack ->
onceOnly(callBack).let { ff: (Either<Throwable, A>) -> Unit ->
Async { ff: (Either<Throwable, A>) -> Unit ->
onceOnly(ff).let { callback: (Either<Throwable, A>) -> Unit ->
try {
k(ff)
k(callback)
} catch (throwable: Throwable) {
ff(Either.Left(throwable))
callback(Either.Left(throwable))
}
}
}
Expand Down
120 changes: 120 additions & 0 deletions kategory-effects/src/main/kotlin/kategory/effects/data/JobKW.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package kategory

import kategory.effects.data.internal.Platform.onceOnly
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.CoroutineStart
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.launch
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.experimental.CoroutineContext

@higherkind data class JobKW<out A>(val thunk: ((Either<Throwable, A>) -> Unit) -> Job) : JobKWKind<A> {

fun <B> map(f: (A) -> B): JobKW<B> =
JobKW { ff: (Either<Throwable, B>) -> Unit ->
thunk { either: Either<Throwable, A> ->
ff(either.map(f))
}
}

fun <B> flatMap(coroutineContext: CoroutineContext, f: (A) -> JobKW<B>): JobKW<B> =
JobKW { ff: (Either<Throwable, B>) -> Unit ->
val result = AtomicReference<Either<Throwable, A>>()
thunk(result::set).apply {
invokeOnCompletion { t: Throwable? ->
val state: Either<Throwable, A>? = result.get()
when {
t == null && state != null -> state.fold({ ff(it.left()) }, { f(it).thunk(ff) })
t != null -> JobKW.raiseError<B>(coroutineContext, t)
else -> throw IllegalStateException("JobKW flatMap completed without success or error")
}
}
}
}

companion object {
inline operator fun <A> invoke(coroutineContext: CoroutineContext, noinline f: suspend CoroutineScope.() -> A): JobKW<A> =
JobKW {
onceOnly(it).let { callback: (Either<Throwable, A>) -> Unit ->
launch(coroutineContext, CoroutineStart.DEFAULT) {
callback(
try {
f().right()
} catch (err: Throwable) {
err.left()
}
)
}
}
}

inline fun <A> unsafe(coroutineContext: CoroutineContext, noinline f: suspend CoroutineScope.() -> Either<Throwable, A>): JobKW<A> =
JobKW {
onceOnly(it).let { callback: (Either<Throwable, A>) -> Unit ->
launch(coroutineContext, CoroutineStart.DEFAULT) {
callback(f())
}
}
}

inline fun <A> async(coroutineContext: CoroutineContext, crossinline fa: Proc<A>): JobKW<A> =
JobKW {
onceOnly(it).let { callback: (Either<Throwable, A>) -> Unit ->
launch(coroutineContext, CoroutineStart.DEFAULT) {
fa(callback)
}
}
}

inline fun <A> pure(coroutineContext: CoroutineContext, a: A): JobKW<A> =
JobKW(coroutineContext) { a }

inline fun <A> raiseError(coroutineContext: CoroutineContext, t: Throwable): JobKW<A> =
JobKW.unsafe(coroutineContext) { t.left() }

fun <A, B> tailRecM(coroutineContext: CoroutineContext, a: A, f: (A) -> JobKW<Either<A, B>>): JobKW<B> =
JobKW.async(coroutineContext) { ff: (Either<Throwable, B>) -> Unit ->
f(a).runJob { either: Either<Throwable, Either<A, B>> ->
either.fold({ ff(it.left()) }, {
when (it) {
is Either.Right -> ff(it.b.right())
is Either.Left -> tailRecM(coroutineContext, a, f)
}
})
}
}

inline fun instances(coroutineContext: CoroutineContext): JobKWInstances =
object : JobKWInstances {
override fun CC(): CoroutineContext = coroutineContext
}

inline fun functor(coroutineContext: CoroutineContext): Functor<JobKWHK> = instances(coroutineContext)

inline fun applicative(coroutineContext: CoroutineContext): Applicative<JobKWHK> = instances(coroutineContext)

inline fun monad(coroutineContext: CoroutineContext): Monad<JobKWHK> = instances(coroutineContext)

inline fun monadError(coroutineContext: CoroutineContext): MonadError<JobKWHK, Throwable> = instances(coroutineContext)

inline fun asyncContext(coroutineContext: CoroutineContext): AsyncContext<JobKWHK> = instances(coroutineContext)
}
}

fun <A> JobKWKind<A>.runJob(ff: (Either<Throwable, A>) -> Unit): Job =
this.ev().thunk(ff)

inline fun <A> JobKW<A>.handleErrorWith(crossinline function: (Throwable) -> JobKW<A>): JobKW<A> =
JobKW { ff: (Either<Throwable, A>) -> Unit ->
val result = AtomicReference<Either<Throwable, A>>()
thunk(result::set).apply {
invokeOnCompletion { t: Throwable? ->
val state: Either<Throwable, A>? = result.get()
when {
t == null && state != null -> state.fold({ function(it).thunk(ff) }, { ff(it.right()) })
t != null -> function(t).thunk(ff)
else -> throw IllegalStateException("JobKW handleErrorWith completed without success or error")
}
}
}
}
101 changes: 0 additions & 101 deletions kategory-effects/src/main/kotlin/kategory/effects/data/JobW.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer

object Platform {

internal fun <A> onceOnly(f: (A) -> Unit): (A) -> Unit {
inline fun <A> onceOnly(crossinline f: (A) -> Unit): (A) -> Unit {
val wasCalled = AtomicBoolean(false)

return { a ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kategory

import kotlin.coroutines.experimental.CoroutineContext

interface JobKWInstances :
Functor<JobKWHK>,
Applicative<JobKWHK>,
Monad<JobKWHK>,
MonadError<JobKWHK, Throwable>,
AsyncContext<JobKWHK> {

fun CC(): CoroutineContext

override fun <A> pure(a: A): JobKW<A> =
JobKW.pure(CC(), a)

override fun <A, B> map(fa: HK<JobKWHK, A>, f: (A) -> B): JobKW<B> =
fa.ev().map(f)

override fun <A, B> flatMap(fa: HK<JobKWHK, A>, f: (A) -> HK<JobKWHK, B>): JobKW<B> =
fa.ev().flatMap(CC()) { a: A -> f(a).ev() }

override fun <A, B> tailRecM(a: A, f: (A) -> HK<JobKWHK, Either<A, B>>): JobKW<B> =
JobKW.tailRecM(CC(), a) { aa: A -> f(aa).ev() }

override fun <A> raiseError(e: Throwable): JobKW<A> =
JobKW.raiseError(CC(), e)

override fun <A> handleErrorWith(fa: HK<JobKWHK, A>, f: (Throwable) -> HK<JobKWHK, A>): JobKW<A> =
fa.ev().handleErrorWith { err: Throwable -> f(err).ev() }

override fun <A> runAsync(fa: Proc<A>): HK<JobKWHK, A> =
JobKW.async(CC(), fa)

}

This file was deleted.

Loading